]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/rbd.c
mmc: sdhci-pci: Fix voltage switch for some Intel host controllers
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
8767b293
ID
123#define RBD_FEATURE_LAYERING (1ULL<<0)
124#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
126#define RBD_FEATURE_DATA_POOL (1ULL<<7)
127
ed95b21a
ID
128#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
129 RBD_FEATURE_STRIPINGV2 | \
7e97332e
ID
130 RBD_FEATURE_EXCLUSIVE_LOCK | \
131 RBD_FEATURE_DATA_POOL)
d889140c
AE
132
133/* Features supported by this (client software) implementation. */
134
770eba6e 135#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 136
81a89793
AE
137/*
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 140 */
602adf40
YS
141#define DEV_NAME_LEN 32
142
143/*
144 * block device image metadata (in-memory version)
145 */
146struct rbd_image_header {
f35a4dee 147 /* These six fields never change for a given rbd image */
849b4260 148 char *object_prefix;
602adf40 149 __u8 obj_order;
f35a4dee
AE
150 u64 stripe_unit;
151 u64 stripe_count;
7e97332e 152 s64 data_pool_id;
f35a4dee 153 u64 features; /* Might be changeable someday? */
602adf40 154
f84344f3
AE
155 /* The remaining fields need to be updated occasionally */
156 u64 image_size;
157 struct ceph_snap_context *snapc;
f35a4dee
AE
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
160};
161
0d7dbfce
AE
162/*
163 * An rbd image specification.
164 *
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
168 *
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
173 *
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
179 *
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
183 *
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
0d7dbfce
AE
186 */
187struct rbd_spec {
188 u64 pool_id;
ecb4dc22 189 const char *pool_name;
0d7dbfce 190
ecb4dc22
AE
191 const char *image_id;
192 const char *image_name;
0d7dbfce
AE
193
194 u64 snap_id;
ecb4dc22 195 const char *snap_name;
0d7dbfce
AE
196
197 struct kref kref;
198};
199
602adf40 200/*
f0f8cef5 201 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
202 */
203struct rbd_client {
204 struct ceph_client *client;
205 struct kref kref;
206 struct list_head node;
207};
208
bf0d5f50
AE
209struct rbd_img_request;
210typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
213
214struct rbd_obj_request;
215typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
9969ebc5
AE
217enum obj_request_type {
218 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219};
bf0d5f50 220
6d2940c8
GZ
221enum obj_operation_type {
222 OBJ_OP_WRITE,
223 OBJ_OP_READ,
90e98c52 224 OBJ_OP_DISCARD,
6d2940c8
GZ
225};
226
926f9b3f
AE
227enum obj_req_flags {
228 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 229 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
230 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
231 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
232};
233
bf0d5f50 234struct rbd_obj_request {
a90bb0c1 235 u64 object_no;
bf0d5f50
AE
236 u64 offset; /* object start byte */
237 u64 length; /* bytes from offset */
926f9b3f 238 unsigned long flags;
bf0d5f50 239
c5b5ef6c
AE
240 /*
241 * An object request associated with an image will have its
242 * img_data flag set; a standalone object request will not.
243 *
244 * A standalone object request will have which == BAD_WHICH
245 * and a null obj_request pointer.
246 *
247 * An object request initiated in support of a layered image
248 * object (to check for its existence before a write) will
249 * have which == BAD_WHICH and a non-null obj_request pointer.
250 *
251 * Finally, an object request for rbd image data will have
252 * which != BAD_WHICH, and will have a non-null img_request
253 * pointer. The value of which will be in the range
254 * 0..(img_request->obj_request_count-1).
255 */
256 union {
257 struct rbd_obj_request *obj_request; /* STAT op */
258 struct {
259 struct rbd_img_request *img_request;
260 u64 img_offset;
261 /* links for img_request->obj_requests list */
262 struct list_head links;
263 };
264 };
bf0d5f50
AE
265 u32 which; /* posn image request list */
266
267 enum obj_request_type type;
788e2df3
AE
268 union {
269 struct bio *bio_list;
270 struct {
271 struct page **pages;
272 u32 page_count;
273 };
274 };
0eefd470 275 struct page **copyup_pages;
ebda6408 276 u32 copyup_page_count;
bf0d5f50
AE
277
278 struct ceph_osd_request *osd_req;
279
280 u64 xferred; /* bytes transferred */
1b83bef2 281 int result;
bf0d5f50
AE
282
283 rbd_obj_callback_t callback;
788e2df3 284 struct completion completion;
bf0d5f50
AE
285
286 struct kref kref;
287};
288
0c425248 289enum img_req_flags {
9849e986
AE
290 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
291 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 292 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 293 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
294};
295
bf0d5f50 296struct rbd_img_request {
bf0d5f50
AE
297 struct rbd_device *rbd_dev;
298 u64 offset; /* starting image byte offset */
299 u64 length; /* byte count from offset */
0c425248 300 unsigned long flags;
bf0d5f50 301 union {
9849e986 302 u64 snap_id; /* for reads */
bf0d5f50 303 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
304 };
305 union {
306 struct request *rq; /* block request */
307 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 308 };
3d7efd18 309 struct page **copyup_pages;
ebda6408 310 u32 copyup_page_count;
bf0d5f50
AE
311 spinlock_t completion_lock;/* protects next_completion */
312 u32 next_completion;
313 rbd_img_callback_t callback;
55f27e09 314 u64 xferred;/* aggregate bytes transferred */
a5a337d4 315 int result; /* first nonzero obj_request result */
bf0d5f50
AE
316
317 u32 obj_request_count;
318 struct list_head obj_requests; /* rbd_obj_request structs */
319
320 struct kref kref;
321};
322
323#define for_each_obj_request(ireq, oreq) \
ef06f4d3 324 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 325#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 326 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 327#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 328 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 329
99d16943
ID
330enum rbd_watch_state {
331 RBD_WATCH_STATE_UNREGISTERED,
332 RBD_WATCH_STATE_REGISTERED,
333 RBD_WATCH_STATE_ERROR,
334};
335
ed95b21a
ID
336enum rbd_lock_state {
337 RBD_LOCK_STATE_UNLOCKED,
338 RBD_LOCK_STATE_LOCKED,
339 RBD_LOCK_STATE_RELEASING,
340};
341
342/* WatchNotify::ClientId */
343struct rbd_client_id {
344 u64 gid;
345 u64 handle;
346};
347
f84344f3 348struct rbd_mapping {
99c1f08f 349 u64 size;
34b13184 350 u64 features;
f84344f3
AE
351 bool read_only;
352};
353
602adf40
YS
354/*
355 * a single device
356 */
357struct rbd_device {
de71a297 358 int dev_id; /* blkdev unique id */
602adf40
YS
359
360 int major; /* blkdev assigned major */
dd82fff1 361 int minor;
602adf40 362 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 363
a30b71b9 364 u32 image_format; /* Either 1 or 2 */
602adf40
YS
365 struct rbd_client *rbd_client;
366
367 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368
b82d167b 369 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
370
371 struct rbd_image_header header;
b82d167b 372 unsigned long flags; /* possibly lock protected */
0d7dbfce 373 struct rbd_spec *spec;
d147543d 374 struct rbd_options *opts;
0d6d1e9c 375 char *config_info; /* add{,_single_major} string */
602adf40 376
c41d13a3 377 struct ceph_object_id header_oid;
922dab61 378 struct ceph_object_locator header_oloc;
971f839a 379
1643dfa4 380 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 381
99d16943
ID
382 struct mutex watch_mutex;
383 enum rbd_watch_state watch_state;
922dab61 384 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
385 u64 watch_cookie;
386 struct delayed_work watch_dwork;
59c2be1e 387
ed95b21a
ID
388 struct rw_semaphore lock_rwsem;
389 enum rbd_lock_state lock_state;
cbbfb0ff 390 char lock_cookie[32];
ed95b21a
ID
391 struct rbd_client_id owner_cid;
392 struct work_struct acquired_lock_work;
393 struct work_struct released_lock_work;
394 struct delayed_work lock_dwork;
395 struct work_struct unlock_work;
396 wait_queue_head_t lock_waitq;
397
1643dfa4 398 struct workqueue_struct *task_wq;
59c2be1e 399
86b00e0d
AE
400 struct rbd_spec *parent_spec;
401 u64 parent_overlap;
a2acd00e 402 atomic_t parent_ref;
2f82ee54 403 struct rbd_device *parent;
86b00e0d 404
7ad18afa
CH
405 /* Block layer tags. */
406 struct blk_mq_tag_set tag_set;
407
c666601a
JD
408 /* protects updating the header */
409 struct rw_semaphore header_rwsem;
f84344f3
AE
410
411 struct rbd_mapping mapping;
602adf40
YS
412
413 struct list_head node;
dfc5606d 414
dfc5606d
YS
415 /* sysfs related */
416 struct device dev;
b82d167b 417 unsigned long open_count; /* protected by lock */
dfc5606d
YS
418};
419
b82d167b 420/*
87c0fded
ID
421 * Flag bits for rbd_dev->flags:
422 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
423 * by rbd_dev->lock
424 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 425 */
6d292906
AE
426enum rbd_dev_flags {
427 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 428 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 429 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
430};
431
cfbf6377 432static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 433
602adf40 434static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
435static DEFINE_SPINLOCK(rbd_dev_list_lock);
436
432b8587
AE
437static LIST_HEAD(rbd_client_list); /* clients */
438static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 439
78c2a44a
AE
440/* Slab caches for frequently-allocated structures */
441
1c2a9dfe 442static struct kmem_cache *rbd_img_request_cache;
868311b1 443static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 444
f856dc36
N
445static struct bio_set *rbd_bio_clone;
446
9b60e70b 447static int rbd_major;
f8a22fc2
ID
448static DEFINE_IDA(rbd_dev_id_ida);
449
f5ee37bd
ID
450static struct workqueue_struct *rbd_wq;
451
9b60e70b
ID
452/*
453 * Default to false for now, as single-major requires >= 0.75 version of
454 * userspace rbd utility.
455 */
456static bool single_major = false;
457module_param(single_major, bool, S_IRUGO);
458MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
459
3d7efd18
AE
460static int rbd_img_request_submit(struct rbd_img_request *img_request);
461
f0f8cef5
AE
462static ssize_t rbd_add(struct bus_type *bus, const char *buf,
463 size_t count);
464static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
465 size_t count);
9b60e70b
ID
466static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
467 size_t count);
468static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
469 size_t count);
6d69bb53 470static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 471static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 472
9b60e70b
ID
473static int rbd_dev_id_to_minor(int dev_id)
474{
7e513d43 475 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
476}
477
478static int minor_to_rbd_dev_id(int minor)
479{
7e513d43 480 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
481}
482
ed95b21a
ID
483static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
484{
485 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
486 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
487}
488
489static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
490{
491 bool is_lock_owner;
492
493 down_read(&rbd_dev->lock_rwsem);
494 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
495 up_read(&rbd_dev->lock_rwsem);
496 return is_lock_owner;
497}
498
8767b293
ID
499static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
500{
501 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
502}
503
b15a21dd
GKH
504static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
505static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
506static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
507static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 508static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
509
510static struct attribute *rbd_bus_attrs[] = {
511 &bus_attr_add.attr,
512 &bus_attr_remove.attr,
9b60e70b
ID
513 &bus_attr_add_single_major.attr,
514 &bus_attr_remove_single_major.attr,
8767b293 515 &bus_attr_supported_features.attr,
b15a21dd 516 NULL,
f0f8cef5 517};
92c76dc0
ID
518
519static umode_t rbd_bus_is_visible(struct kobject *kobj,
520 struct attribute *attr, int index)
521{
9b60e70b
ID
522 if (!single_major &&
523 (attr == &bus_attr_add_single_major.attr ||
524 attr == &bus_attr_remove_single_major.attr))
525 return 0;
526
92c76dc0
ID
527 return attr->mode;
528}
529
530static const struct attribute_group rbd_bus_group = {
531 .attrs = rbd_bus_attrs,
532 .is_visible = rbd_bus_is_visible,
533};
534__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
535
536static struct bus_type rbd_bus_type = {
537 .name = "rbd",
b15a21dd 538 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
539};
540
541static void rbd_root_dev_release(struct device *dev)
542{
543}
544
545static struct device rbd_root_dev = {
546 .init_name = "rbd",
547 .release = rbd_root_dev_release,
548};
549
06ecc6cb
AE
550static __printf(2, 3)
551void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
552{
553 struct va_format vaf;
554 va_list args;
555
556 va_start(args, fmt);
557 vaf.fmt = fmt;
558 vaf.va = &args;
559
560 if (!rbd_dev)
561 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
562 else if (rbd_dev->disk)
563 printk(KERN_WARNING "%s: %s: %pV\n",
564 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
565 else if (rbd_dev->spec && rbd_dev->spec->image_name)
566 printk(KERN_WARNING "%s: image %s: %pV\n",
567 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
568 else if (rbd_dev->spec && rbd_dev->spec->image_id)
569 printk(KERN_WARNING "%s: id %s: %pV\n",
570 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
571 else /* punt */
572 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
573 RBD_DRV_NAME, rbd_dev, &vaf);
574 va_end(args);
575}
576
aafb230e
AE
577#ifdef RBD_DEBUG
578#define rbd_assert(expr) \
579 if (unlikely(!(expr))) { \
580 printk(KERN_ERR "\nAssertion failure in %s() " \
581 "at line %d:\n\n" \
582 "\trbd_assert(%s);\n\n", \
583 __func__, __LINE__, #expr); \
584 BUG(); \
585 }
586#else /* !RBD_DEBUG */
587# define rbd_assert(expr) ((void) 0)
588#endif /* !RBD_DEBUG */
dfc5606d 589
2761713d 590static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 591static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
592static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
593static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 594
cc4a38bd 595static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 596static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 597static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 598static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
599static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
600 u64 snap_id);
2ad3d716
AE
601static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
602 u8 *order, u64 *snap_size);
603static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
604 u64 *snap_features);
59c2be1e 605
602adf40
YS
606static int rbd_open(struct block_device *bdev, fmode_t mode)
607{
f0f8cef5 608 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 609 bool removing = false;
602adf40 610
f84344f3 611 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
612 return -EROFS;
613
a14ea269 614 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
615 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616 removing = true;
617 else
618 rbd_dev->open_count++;
a14ea269 619 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
620 if (removing)
621 return -ENOENT;
622
c3e946ce 623 (void) get_device(&rbd_dev->dev);
340c7a2b 624
602adf40
YS
625 return 0;
626}
627
db2a144b 628static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
629{
630 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
631 unsigned long open_count_before;
632
a14ea269 633 spin_lock_irq(&rbd_dev->lock);
b82d167b 634 open_count_before = rbd_dev->open_count--;
a14ea269 635 spin_unlock_irq(&rbd_dev->lock);
b82d167b 636 rbd_assert(open_count_before > 0);
dfc5606d 637
c3e946ce 638 put_device(&rbd_dev->dev);
dfc5606d
YS
639}
640
131fd9f6
GZ
641static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
642{
77f33c03 643 int ret = 0;
131fd9f6
GZ
644 int val;
645 bool ro;
77f33c03 646 bool ro_changed = false;
131fd9f6 647
77f33c03 648 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
649 if (get_user(val, (int __user *)(arg)))
650 return -EFAULT;
651
652 ro = val ? true : false;
653 /* Snapshot doesn't allow to write*/
654 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
655 return -EROFS;
656
77f33c03
JD
657 spin_lock_irq(&rbd_dev->lock);
658 /* prevent others open this device */
659 if (rbd_dev->open_count > 1) {
660 ret = -EBUSY;
661 goto out;
662 }
663
131fd9f6
GZ
664 if (rbd_dev->mapping.read_only != ro) {
665 rbd_dev->mapping.read_only = ro;
77f33c03 666 ro_changed = true;
131fd9f6
GZ
667 }
668
77f33c03
JD
669out:
670 spin_unlock_irq(&rbd_dev->lock);
671 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
672 if (ret == 0 && ro_changed)
673 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
674
675 return ret;
131fd9f6
GZ
676}
677
678static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
679 unsigned int cmd, unsigned long arg)
680{
681 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
682 int ret = 0;
683
131fd9f6
GZ
684 switch (cmd) {
685 case BLKROSET:
686 ret = rbd_ioctl_set_ro(rbd_dev, arg);
687 break;
688 default:
689 ret = -ENOTTY;
690 }
691
131fd9f6
GZ
692 return ret;
693}
694
695#ifdef CONFIG_COMPAT
696static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
697 unsigned int cmd, unsigned long arg)
698{
699 return rbd_ioctl(bdev, mode, cmd, arg);
700}
701#endif /* CONFIG_COMPAT */
702
602adf40
YS
703static const struct block_device_operations rbd_bd_ops = {
704 .owner = THIS_MODULE,
705 .open = rbd_open,
dfc5606d 706 .release = rbd_release,
131fd9f6
GZ
707 .ioctl = rbd_ioctl,
708#ifdef CONFIG_COMPAT
709 .compat_ioctl = rbd_compat_ioctl,
710#endif
602adf40
YS
711};
712
713/*
7262cfca 714 * Initialize an rbd client instance. Success or not, this function
cfbf6377 715 * consumes ceph_opts. Caller holds client_mutex.
602adf40 716 */
f8c38929 717static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
718{
719 struct rbd_client *rbdc;
720 int ret = -ENOMEM;
721
37206ee5 722 dout("%s:\n", __func__);
602adf40
YS
723 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
724 if (!rbdc)
725 goto out_opt;
726
727 kref_init(&rbdc->kref);
728 INIT_LIST_HEAD(&rbdc->node);
729
74da4a0f 730 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 731 if (IS_ERR(rbdc->client))
08f75463 732 goto out_rbdc;
43ae4701 733 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
734
735 ret = ceph_open_session(rbdc->client);
736 if (ret < 0)
08f75463 737 goto out_client;
602adf40 738
432b8587 739 spin_lock(&rbd_client_list_lock);
602adf40 740 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 741 spin_unlock(&rbd_client_list_lock);
602adf40 742
37206ee5 743 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 744
602adf40 745 return rbdc;
08f75463 746out_client:
602adf40 747 ceph_destroy_client(rbdc->client);
08f75463 748out_rbdc:
602adf40
YS
749 kfree(rbdc);
750out_opt:
43ae4701
AE
751 if (ceph_opts)
752 ceph_destroy_options(ceph_opts);
37206ee5
AE
753 dout("%s: error %d\n", __func__, ret);
754
28f259b7 755 return ERR_PTR(ret);
602adf40
YS
756}
757
2f82ee54
AE
758static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
759{
760 kref_get(&rbdc->kref);
761
762 return rbdc;
763}
764
602adf40 765/*
1f7ba331
AE
766 * Find a ceph client with specific addr and configuration. If
767 * found, bump its reference count.
602adf40 768 */
1f7ba331 769static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
770{
771 struct rbd_client *client_node;
1f7ba331 772 bool found = false;
602adf40 773
43ae4701 774 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
775 return NULL;
776
1f7ba331
AE
777 spin_lock(&rbd_client_list_lock);
778 list_for_each_entry(client_node, &rbd_client_list, node) {
779 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
780 __rbd_get_client(client_node);
781
1f7ba331
AE
782 found = true;
783 break;
784 }
785 }
786 spin_unlock(&rbd_client_list_lock);
787
788 return found ? client_node : NULL;
602adf40
YS
789}
790
59c2be1e 791/*
210c104c 792 * (Per device) rbd map options
59c2be1e
YS
793 */
794enum {
b5584180 795 Opt_queue_depth,
59c2be1e
YS
796 Opt_last_int,
797 /* int args above */
798 Opt_last_string,
799 /* string args above */
cc0538b6
AE
800 Opt_read_only,
801 Opt_read_write,
80de1912 802 Opt_lock_on_read,
e010dd0a 803 Opt_exclusive,
210c104c 804 Opt_err
59c2be1e
YS
805};
806
43ae4701 807static match_table_t rbd_opts_tokens = {
b5584180 808 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
809 /* int args above */
810 /* string args above */
be466c1c 811 {Opt_read_only, "read_only"},
cc0538b6
AE
812 {Opt_read_only, "ro"}, /* Alternate spelling */
813 {Opt_read_write, "read_write"},
814 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 815 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 816 {Opt_exclusive, "exclusive"},
210c104c 817 {Opt_err, NULL}
59c2be1e
YS
818};
819
98571b5a 820struct rbd_options {
b5584180 821 int queue_depth;
98571b5a 822 bool read_only;
80de1912 823 bool lock_on_read;
e010dd0a 824 bool exclusive;
98571b5a
AE
825};
826
b5584180 827#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 828#define RBD_READ_ONLY_DEFAULT false
80de1912 829#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 830#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 831
59c2be1e
YS
832static int parse_rbd_opts_token(char *c, void *private)
833{
43ae4701 834 struct rbd_options *rbd_opts = private;
59c2be1e
YS
835 substring_t argstr[MAX_OPT_ARGS];
836 int token, intval, ret;
837
43ae4701 838 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
839 if (token < Opt_last_int) {
840 ret = match_int(&argstr[0], &intval);
841 if (ret < 0) {
210c104c 842 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
843 return ret;
844 }
845 dout("got int token %d val %d\n", token, intval);
846 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 847 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
848 } else {
849 dout("got token %d\n", token);
850 }
851
852 switch (token) {
b5584180
ID
853 case Opt_queue_depth:
854 if (intval < 1) {
855 pr_err("queue_depth out of range\n");
856 return -EINVAL;
857 }
858 rbd_opts->queue_depth = intval;
859 break;
cc0538b6
AE
860 case Opt_read_only:
861 rbd_opts->read_only = true;
862 break;
863 case Opt_read_write:
864 rbd_opts->read_only = false;
865 break;
80de1912
ID
866 case Opt_lock_on_read:
867 rbd_opts->lock_on_read = true;
868 break;
e010dd0a
ID
869 case Opt_exclusive:
870 rbd_opts->exclusive = true;
871 break;
59c2be1e 872 default:
210c104c
ID
873 /* libceph prints "bad option" msg */
874 return -EINVAL;
59c2be1e 875 }
210c104c 876
59c2be1e
YS
877 return 0;
878}
879
6d2940c8
GZ
880static char* obj_op_name(enum obj_operation_type op_type)
881{
882 switch (op_type) {
883 case OBJ_OP_READ:
884 return "read";
885 case OBJ_OP_WRITE:
886 return "write";
90e98c52
GZ
887 case OBJ_OP_DISCARD:
888 return "discard";
6d2940c8
GZ
889 default:
890 return "???";
891 }
892}
893
602adf40
YS
894/*
895 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
896 * not exist create it. Either way, ceph_opts is consumed by this
897 * function.
602adf40 898 */
9d3997fd 899static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 900{
f8c38929 901 struct rbd_client *rbdc;
59c2be1e 902
cfbf6377 903 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 904 rbdc = rbd_client_find(ceph_opts);
9d3997fd 905 if (rbdc) /* using an existing client */
43ae4701 906 ceph_destroy_options(ceph_opts);
9d3997fd 907 else
f8c38929 908 rbdc = rbd_client_create(ceph_opts);
cfbf6377 909 mutex_unlock(&client_mutex);
602adf40 910
9d3997fd 911 return rbdc;
602adf40
YS
912}
913
914/*
915 * Destroy ceph client
d23a4b3f 916 *
432b8587 917 * Caller must hold rbd_client_list_lock.
602adf40
YS
918 */
919static void rbd_client_release(struct kref *kref)
920{
921 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
922
37206ee5 923 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 924 spin_lock(&rbd_client_list_lock);
602adf40 925 list_del(&rbdc->node);
cd9d9f5d 926 spin_unlock(&rbd_client_list_lock);
602adf40
YS
927
928 ceph_destroy_client(rbdc->client);
929 kfree(rbdc);
930}
931
932/*
933 * Drop reference to ceph client node. If it's not referenced anymore, release
934 * it.
935 */
9d3997fd 936static void rbd_put_client(struct rbd_client *rbdc)
602adf40 937{
c53d5893
AE
938 if (rbdc)
939 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
940}
941
a30b71b9
AE
942static bool rbd_image_format_valid(u32 image_format)
943{
944 return image_format == 1 || image_format == 2;
945}
946
8e94af8e
AE
947static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
948{
103a150f
AE
949 size_t size;
950 u32 snap_count;
951
952 /* The header has to start with the magic rbd header text */
953 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
954 return false;
955
db2388b6
AE
956 /* The bio layer requires at least sector-sized I/O */
957
958 if (ondisk->options.order < SECTOR_SHIFT)
959 return false;
960
961 /* If we use u64 in a few spots we may be able to loosen this */
962
963 if (ondisk->options.order > 8 * sizeof (int) - 1)
964 return false;
965
103a150f
AE
966 /*
967 * The size of a snapshot header has to fit in a size_t, and
968 * that limits the number of snapshots.
969 */
970 snap_count = le32_to_cpu(ondisk->snap_count);
971 size = SIZE_MAX - sizeof (struct ceph_snap_context);
972 if (snap_count > size / sizeof (__le64))
973 return false;
974
975 /*
976 * Not only that, but the size of the entire the snapshot
977 * header must also be representable in a size_t.
978 */
979 size -= snap_count * sizeof (__le64);
980 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
981 return false;
982
983 return true;
8e94af8e
AE
984}
985
5bc3fb17
ID
986/*
987 * returns the size of an object in the image
988 */
989static u32 rbd_obj_bytes(struct rbd_image_header *header)
990{
991 return 1U << header->obj_order;
992}
993
263423f8
ID
994static void rbd_init_layout(struct rbd_device *rbd_dev)
995{
996 if (rbd_dev->header.stripe_unit == 0 ||
997 rbd_dev->header.stripe_count == 0) {
998 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
999 rbd_dev->header.stripe_count = 1;
1000 }
1001
1002 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1003 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1004 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
1005 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1006 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
1007 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1008}
1009
602adf40 1010/*
bb23e37a
AE
1011 * Fill an rbd image header with information from the given format 1
1012 * on-disk header.
602adf40 1013 */
662518b1 1014static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 1015 struct rbd_image_header_ondisk *ondisk)
602adf40 1016{
662518b1 1017 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
1018 bool first_time = header->object_prefix == NULL;
1019 struct ceph_snap_context *snapc;
1020 char *object_prefix = NULL;
1021 char *snap_names = NULL;
1022 u64 *snap_sizes = NULL;
ccece235 1023 u32 snap_count;
bb23e37a 1024 int ret = -ENOMEM;
621901d6 1025 u32 i;
602adf40 1026
bb23e37a 1027 /* Allocate this now to avoid having to handle failure below */
6a52325f 1028
bb23e37a 1029 if (first_time) {
848d796c
ID
1030 object_prefix = kstrndup(ondisk->object_prefix,
1031 sizeof(ondisk->object_prefix),
1032 GFP_KERNEL);
bb23e37a
AE
1033 if (!object_prefix)
1034 return -ENOMEM;
bb23e37a 1035 }
00f1f36f 1036
bb23e37a 1037 /* Allocate the snapshot context and fill it in */
00f1f36f 1038
bb23e37a
AE
1039 snap_count = le32_to_cpu(ondisk->snap_count);
1040 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1041 if (!snapc)
1042 goto out_err;
1043 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1044 if (snap_count) {
bb23e37a 1045 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1046 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1047
bb23e37a 1048 /* We'll keep a copy of the snapshot names... */
621901d6 1049
bb23e37a
AE
1050 if (snap_names_len > (u64)SIZE_MAX)
1051 goto out_2big;
1052 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1053 if (!snap_names)
6a52325f
AE
1054 goto out_err;
1055
bb23e37a 1056 /* ...as well as the array of their sizes. */
88a25a5f
ME
1057 snap_sizes = kmalloc_array(snap_count,
1058 sizeof(*header->snap_sizes),
1059 GFP_KERNEL);
bb23e37a 1060 if (!snap_sizes)
6a52325f 1061 goto out_err;
bb23e37a 1062
f785cc1d 1063 /*
bb23e37a
AE
1064 * Copy the names, and fill in each snapshot's id
1065 * and size.
1066 *
99a41ebc 1067 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1068 * ondisk buffer we're working with has
f785cc1d
AE
1069 * snap_names_len bytes beyond the end of the
1070 * snapshot id array, this memcpy() is safe.
1071 */
bb23e37a
AE
1072 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1073 snaps = ondisk->snaps;
1074 for (i = 0; i < snap_count; i++) {
1075 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1076 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1077 }
602adf40 1078 }
6a52325f 1079
bb23e37a 1080 /* We won't fail any more, fill in the header */
621901d6 1081
bb23e37a
AE
1082 if (first_time) {
1083 header->object_prefix = object_prefix;
1084 header->obj_order = ondisk->options.order;
263423f8 1085 rbd_init_layout(rbd_dev);
602adf40 1086 } else {
662518b1
AE
1087 ceph_put_snap_context(header->snapc);
1088 kfree(header->snap_names);
1089 kfree(header->snap_sizes);
602adf40 1090 }
849b4260 1091
bb23e37a 1092 /* The remaining fields always get updated (when we refresh) */
621901d6 1093
f84344f3 1094 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1095 header->snapc = snapc;
1096 header->snap_names = snap_names;
1097 header->snap_sizes = snap_sizes;
468521c1 1098
602adf40 1099 return 0;
bb23e37a
AE
1100out_2big:
1101 ret = -EIO;
6a52325f 1102out_err:
bb23e37a
AE
1103 kfree(snap_sizes);
1104 kfree(snap_names);
1105 ceph_put_snap_context(snapc);
1106 kfree(object_prefix);
ccece235 1107
bb23e37a 1108 return ret;
602adf40
YS
1109}
1110
9682fc6d
AE
1111static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1112{
1113 const char *snap_name;
1114
1115 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1116
1117 /* Skip over names until we find the one we are looking for */
1118
1119 snap_name = rbd_dev->header.snap_names;
1120 while (which--)
1121 snap_name += strlen(snap_name) + 1;
1122
1123 return kstrdup(snap_name, GFP_KERNEL);
1124}
1125
30d1cff8
AE
1126/*
1127 * Snapshot id comparison function for use with qsort()/bsearch().
1128 * Note that result is for snapshots in *descending* order.
1129 */
1130static int snapid_compare_reverse(const void *s1, const void *s2)
1131{
1132 u64 snap_id1 = *(u64 *)s1;
1133 u64 snap_id2 = *(u64 *)s2;
1134
1135 if (snap_id1 < snap_id2)
1136 return 1;
1137 return snap_id1 == snap_id2 ? 0 : -1;
1138}
1139
1140/*
1141 * Search a snapshot context to see if the given snapshot id is
1142 * present.
1143 *
1144 * Returns the position of the snapshot id in the array if it's found,
1145 * or BAD_SNAP_INDEX otherwise.
1146 *
1147 * Note: The snapshot array is in kept sorted (by the osd) in
1148 * reverse order, highest snapshot id first.
1149 */
9682fc6d
AE
1150static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1151{
1152 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1153 u64 *found;
9682fc6d 1154
30d1cff8
AE
1155 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1156 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1157
30d1cff8 1158 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1159}
1160
2ad3d716
AE
1161static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1162 u64 snap_id)
9e15b77d 1163{
54cac61f 1164 u32 which;
da6a6b63 1165 const char *snap_name;
9e15b77d 1166
54cac61f
AE
1167 which = rbd_dev_snap_index(rbd_dev, snap_id);
1168 if (which == BAD_SNAP_INDEX)
da6a6b63 1169 return ERR_PTR(-ENOENT);
54cac61f 1170
da6a6b63
JD
1171 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1172 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1173}
1174
1175static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1176{
9e15b77d
AE
1177 if (snap_id == CEPH_NOSNAP)
1178 return RBD_SNAP_HEAD_NAME;
1179
54cac61f
AE
1180 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181 if (rbd_dev->image_format == 1)
1182 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1183
54cac61f 1184 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1185}
1186
2ad3d716
AE
1187static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1188 u64 *snap_size)
602adf40 1189{
2ad3d716
AE
1190 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191 if (snap_id == CEPH_NOSNAP) {
1192 *snap_size = rbd_dev->header.image_size;
1193 } else if (rbd_dev->image_format == 1) {
1194 u32 which;
602adf40 1195
2ad3d716
AE
1196 which = rbd_dev_snap_index(rbd_dev, snap_id);
1197 if (which == BAD_SNAP_INDEX)
1198 return -ENOENT;
e86924a8 1199
2ad3d716
AE
1200 *snap_size = rbd_dev->header.snap_sizes[which];
1201 } else {
1202 u64 size = 0;
1203 int ret;
1204
1205 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1206 if (ret)
1207 return ret;
1208
1209 *snap_size = size;
1210 }
1211 return 0;
602adf40
YS
1212}
1213
2ad3d716
AE
1214static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1215 u64 *snap_features)
602adf40 1216{
2ad3d716
AE
1217 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1218 if (snap_id == CEPH_NOSNAP) {
1219 *snap_features = rbd_dev->header.features;
1220 } else if (rbd_dev->image_format == 1) {
1221 *snap_features = 0; /* No features for format 1 */
602adf40 1222 } else {
2ad3d716
AE
1223 u64 features = 0;
1224 int ret;
8b0241f8 1225
2ad3d716
AE
1226 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1227 if (ret)
1228 return ret;
1229
1230 *snap_features = features;
1231 }
1232 return 0;
1233}
1234
1235static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1236{
8f4b7d98 1237 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1238 u64 size = 0;
1239 u64 features = 0;
1240 int ret;
1241
2ad3d716
AE
1242 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1243 if (ret)
1244 return ret;
1245 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1246 if (ret)
1247 return ret;
1248
1249 rbd_dev->mapping.size = size;
1250 rbd_dev->mapping.features = features;
1251
8b0241f8 1252 return 0;
602adf40
YS
1253}
1254
d1cf5788
AE
1255static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1256{
1257 rbd_dev->mapping.size = 0;
1258 rbd_dev->mapping.features = 0;
200a6a8b
AE
1259}
1260
65ccfe21
AE
1261static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1262{
5bc3fb17 1263 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
602adf40 1264
65ccfe21
AE
1265 return offset & (segment_size - 1);
1266}
1267
1268static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269 u64 offset, u64 length)
1270{
5bc3fb17 1271 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
65ccfe21
AE
1272
1273 offset &= segment_size - 1;
1274
aafb230e 1275 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1276 if (offset + length > segment_size)
1277 length = segment_size - offset;
1278
1279 return length;
602adf40
YS
1280}
1281
1282/*
1283 * bio helpers
1284 */
1285
1286static void bio_chain_put(struct bio *chain)
1287{
1288 struct bio *tmp;
1289
1290 while (chain) {
1291 tmp = chain;
1292 chain = chain->bi_next;
1293 bio_put(tmp);
1294 }
1295}
1296
1297/*
1298 * zeros a bio chain, starting at specific offset
1299 */
1300static void zero_bio_chain(struct bio *chain, int start_ofs)
1301{
7988613b
KO
1302 struct bio_vec bv;
1303 struct bvec_iter iter;
602adf40
YS
1304 unsigned long flags;
1305 void *buf;
602adf40
YS
1306 int pos = 0;
1307
1308 while (chain) {
7988613b
KO
1309 bio_for_each_segment(bv, chain, iter) {
1310 if (pos + bv.bv_len > start_ofs) {
602adf40 1311 int remainder = max(start_ofs - pos, 0);
7988613b 1312 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1313 memset(buf + remainder, 0,
7988613b
KO
1314 bv.bv_len - remainder);
1315 flush_dcache_page(bv.bv_page);
85b5aaa6 1316 bvec_kunmap_irq(buf, &flags);
602adf40 1317 }
7988613b 1318 pos += bv.bv_len;
602adf40
YS
1319 }
1320
1321 chain = chain->bi_next;
1322 }
1323}
1324
b9434c5b
AE
1325/*
1326 * similar to zero_bio_chain(), zeros data defined by a page array,
1327 * starting at the given byte offset from the start of the array and
1328 * continuing up to the given end offset. The pages array is
1329 * assumed to be big enough to hold all bytes up to the end.
1330 */
1331static void zero_pages(struct page **pages, u64 offset, u64 end)
1332{
1333 struct page **page = &pages[offset >> PAGE_SHIFT];
1334
1335 rbd_assert(end > offset);
1336 rbd_assert(end - offset <= (u64)SIZE_MAX);
1337 while (offset < end) {
1338 size_t page_offset;
1339 size_t length;
1340 unsigned long flags;
1341 void *kaddr;
1342
491205a8
GU
1343 page_offset = offset & ~PAGE_MASK;
1344 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1345 local_irq_save(flags);
1346 kaddr = kmap_atomic(*page);
1347 memset(kaddr + page_offset, 0, length);
e2156054 1348 flush_dcache_page(*page);
b9434c5b
AE
1349 kunmap_atomic(kaddr);
1350 local_irq_restore(flags);
1351
1352 offset += length;
1353 page++;
1354 }
1355}
1356
602adf40 1357/*
f7760dad
AE
1358 * Clone a portion of a bio, starting at the given byte offset
1359 * and continuing for the number of bytes indicated.
602adf40 1360 */
f7760dad
AE
1361static struct bio *bio_clone_range(struct bio *bio_src,
1362 unsigned int offset,
1363 unsigned int len,
1364 gfp_t gfpmask)
602adf40 1365{
f7760dad
AE
1366 struct bio *bio;
1367
f856dc36 1368 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
f7760dad
AE
1369 if (!bio)
1370 return NULL; /* ENOMEM */
602adf40 1371
5341a627 1372 bio_advance(bio, offset);
4f024f37 1373 bio->bi_iter.bi_size = len;
f7760dad
AE
1374
1375 return bio;
1376}
1377
1378/*
1379 * Clone a portion of a bio chain, starting at the given byte offset
1380 * into the first bio in the source chain and continuing for the
1381 * number of bytes indicated. The result is another bio chain of
1382 * exactly the given length, or a null pointer on error.
1383 *
1384 * The bio_src and offset parameters are both in-out. On entry they
1385 * refer to the first source bio and the offset into that bio where
1386 * the start of data to be cloned is located.
1387 *
1388 * On return, bio_src is updated to refer to the bio in the source
1389 * chain that contains first un-cloned byte, and *offset will
1390 * contain the offset of that byte within that bio.
1391 */
1392static struct bio *bio_chain_clone_range(struct bio **bio_src,
1393 unsigned int *offset,
1394 unsigned int len,
1395 gfp_t gfpmask)
1396{
1397 struct bio *bi = *bio_src;
1398 unsigned int off = *offset;
1399 struct bio *chain = NULL;
1400 struct bio **end;
1401
1402 /* Build up a chain of clone bios up to the limit */
1403
4f024f37 1404 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1405 return NULL; /* Nothing to clone */
602adf40 1406
f7760dad
AE
1407 end = &chain;
1408 while (len) {
1409 unsigned int bi_size;
1410 struct bio *bio;
1411
f5400b7a
AE
1412 if (!bi) {
1413 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1414 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1415 }
4f024f37 1416 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1417 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1418 if (!bio)
1419 goto out_err; /* ENOMEM */
1420
1421 *end = bio;
1422 end = &bio->bi_next;
602adf40 1423
f7760dad 1424 off += bi_size;
4f024f37 1425 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1426 bi = bi->bi_next;
1427 off = 0;
1428 }
1429 len -= bi_size;
1430 }
1431 *bio_src = bi;
1432 *offset = off;
1433
1434 return chain;
1435out_err:
1436 bio_chain_put(chain);
602adf40 1437
602adf40
YS
1438 return NULL;
1439}
1440
926f9b3f
AE
1441/*
1442 * The default/initial value for all object request flags is 0. For
1443 * each flag, once its value is set to 1 it is never reset to 0
1444 * again.
1445 */
57acbaa7 1446static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1447{
57acbaa7 1448 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1449 struct rbd_device *rbd_dev;
1450
57acbaa7 1451 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1452 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1453 obj_request);
1454 }
1455}
1456
57acbaa7 1457static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1458{
1459 smp_mb();
57acbaa7 1460 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1461}
1462
57acbaa7 1463static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1464{
57acbaa7
AE
1465 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1466 struct rbd_device *rbd_dev = NULL;
6365d33a 1467
57acbaa7
AE
1468 if (obj_request_img_data_test(obj_request))
1469 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1470 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1471 obj_request);
1472 }
1473}
1474
57acbaa7 1475static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1476{
1477 smp_mb();
57acbaa7 1478 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1479}
1480
5679c59f
AE
1481/*
1482 * This sets the KNOWN flag after (possibly) setting the EXISTS
1483 * flag. The latter is set based on the "exists" value provided.
1484 *
1485 * Note that for our purposes once an object exists it never goes
1486 * away again. It's possible that the response from two existence
1487 * checks are separated by the creation of the target object, and
1488 * the first ("doesn't exist") response arrives *after* the second
1489 * ("does exist"). In that case we ignore the second one.
1490 */
1491static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1492 bool exists)
1493{
1494 if (exists)
1495 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1496 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1497 smp_mb();
1498}
1499
1500static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1501{
1502 smp_mb();
1503 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1504}
1505
1506static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1507{
1508 smp_mb();
1509 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1510}
1511
9638556a
ID
1512static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1513{
1514 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1515
1516 return obj_request->img_offset <
1517 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1518}
1519
bf0d5f50
AE
1520static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1521{
37206ee5 1522 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1523 kref_read(&obj_request->kref));
bf0d5f50
AE
1524 kref_get(&obj_request->kref);
1525}
1526
1527static void rbd_obj_request_destroy(struct kref *kref);
1528static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1529{
1530 rbd_assert(obj_request != NULL);
37206ee5 1531 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1532 kref_read(&obj_request->kref));
bf0d5f50
AE
1533 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1534}
1535
0f2d5be7
AE
1536static void rbd_img_request_get(struct rbd_img_request *img_request)
1537{
1538 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1539 kref_read(&img_request->kref));
0f2d5be7
AE
1540 kref_get(&img_request->kref);
1541}
1542
e93f3152
AE
1543static bool img_request_child_test(struct rbd_img_request *img_request);
1544static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1545static void rbd_img_request_destroy(struct kref *kref);
1546static void rbd_img_request_put(struct rbd_img_request *img_request)
1547{
1548 rbd_assert(img_request != NULL);
37206ee5 1549 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1550 kref_read(&img_request->kref));
e93f3152
AE
1551 if (img_request_child_test(img_request))
1552 kref_put(&img_request->kref, rbd_parent_request_destroy);
1553 else
1554 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1555}
1556
1557static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1558 struct rbd_obj_request *obj_request)
1559{
25dcf954
AE
1560 rbd_assert(obj_request->img_request == NULL);
1561
b155e86c 1562 /* Image request now owns object's original reference */
bf0d5f50 1563 obj_request->img_request = img_request;
25dcf954 1564 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1565 rbd_assert(!obj_request_img_data_test(obj_request));
1566 obj_request_img_data_set(obj_request);
bf0d5f50 1567 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1568 img_request->obj_request_count++;
1569 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1570 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1571 obj_request->which);
bf0d5f50
AE
1572}
1573
1574static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1575 struct rbd_obj_request *obj_request)
1576{
1577 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1578
37206ee5
AE
1579 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1580 obj_request->which);
bf0d5f50 1581 list_del(&obj_request->links);
25dcf954
AE
1582 rbd_assert(img_request->obj_request_count > 0);
1583 img_request->obj_request_count--;
1584 rbd_assert(obj_request->which == img_request->obj_request_count);
1585 obj_request->which = BAD_WHICH;
6365d33a 1586 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1587 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1588 obj_request->img_request = NULL;
25dcf954 1589 obj_request->callback = NULL;
bf0d5f50
AE
1590 rbd_obj_request_put(obj_request);
1591}
1592
1593static bool obj_request_type_valid(enum obj_request_type type)
1594{
1595 switch (type) {
9969ebc5 1596 case OBJ_REQUEST_NODATA:
bf0d5f50 1597 case OBJ_REQUEST_BIO:
788e2df3 1598 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1599 return true;
1600 default:
1601 return false;
1602 }
1603}
1604
4a17dadc
ID
1605static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1606
980917fc 1607static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1608{
980917fc
ID
1609 struct ceph_osd_request *osd_req = obj_request->osd_req;
1610
a90bb0c1
ID
1611 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1612 obj_request, obj_request->object_no, obj_request->offset,
67e2b652 1613 obj_request->length, osd_req);
4a17dadc
ID
1614 if (obj_request_img_data_test(obj_request)) {
1615 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1616 rbd_img_request_get(obj_request->img_request);
1617 }
980917fc 1618 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1619}
1620
1621static void rbd_img_request_complete(struct rbd_img_request *img_request)
1622{
55f27e09 1623
37206ee5 1624 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1625
1626 /*
1627 * If no error occurred, compute the aggregate transfer
1628 * count for the image request. We could instead use
1629 * atomic64_cmpxchg() to update it as each object request
1630 * completes; not clear which way is better off hand.
1631 */
1632 if (!img_request->result) {
1633 struct rbd_obj_request *obj_request;
1634 u64 xferred = 0;
1635
1636 for_each_obj_request(img_request, obj_request)
1637 xferred += obj_request->xferred;
1638 img_request->xferred = xferred;
1639 }
1640
bf0d5f50
AE
1641 if (img_request->callback)
1642 img_request->callback(img_request);
1643 else
1644 rbd_img_request_put(img_request);
1645}
1646
0c425248
AE
1647/*
1648 * The default/initial value for all image request flags is 0. Each
1649 * is conditionally set to 1 at image request initialization time
1650 * and currently never change thereafter.
1651 */
1652static void img_request_write_set(struct rbd_img_request *img_request)
1653{
1654 set_bit(IMG_REQ_WRITE, &img_request->flags);
1655 smp_mb();
1656}
1657
1658static bool img_request_write_test(struct rbd_img_request *img_request)
1659{
1660 smp_mb();
1661 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1662}
1663
90e98c52
GZ
1664/*
1665 * Set the discard flag when the img_request is an discard request
1666 */
1667static void img_request_discard_set(struct rbd_img_request *img_request)
1668{
1669 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1670 smp_mb();
1671}
1672
1673static bool img_request_discard_test(struct rbd_img_request *img_request)
1674{
1675 smp_mb();
1676 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1677}
1678
9849e986
AE
1679static void img_request_child_set(struct rbd_img_request *img_request)
1680{
1681 set_bit(IMG_REQ_CHILD, &img_request->flags);
1682 smp_mb();
1683}
1684
e93f3152
AE
1685static void img_request_child_clear(struct rbd_img_request *img_request)
1686{
1687 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1688 smp_mb();
1689}
1690
9849e986
AE
1691static bool img_request_child_test(struct rbd_img_request *img_request)
1692{
1693 smp_mb();
1694 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1695}
1696
d0b2e944
AE
1697static void img_request_layered_set(struct rbd_img_request *img_request)
1698{
1699 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1700 smp_mb();
1701}
1702
a2acd00e
AE
1703static void img_request_layered_clear(struct rbd_img_request *img_request)
1704{
1705 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1706 smp_mb();
1707}
1708
d0b2e944
AE
1709static bool img_request_layered_test(struct rbd_img_request *img_request)
1710{
1711 smp_mb();
1712 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1713}
1714
3b434a2a
JD
1715static enum obj_operation_type
1716rbd_img_request_op_type(struct rbd_img_request *img_request)
1717{
1718 if (img_request_write_test(img_request))
1719 return OBJ_OP_WRITE;
1720 else if (img_request_discard_test(img_request))
1721 return OBJ_OP_DISCARD;
1722 else
1723 return OBJ_OP_READ;
1724}
1725
6e2a4505
AE
1726static void
1727rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1728{
b9434c5b
AE
1729 u64 xferred = obj_request->xferred;
1730 u64 length = obj_request->length;
1731
6e2a4505
AE
1732 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1733 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1734 xferred, length);
6e2a4505 1735 /*
17c1cc1d
JD
1736 * ENOENT means a hole in the image. We zero-fill the entire
1737 * length of the request. A short read also implies zero-fill
1738 * to the end of the request. An error requires the whole
1739 * length of the request to be reported finished with an error
1740 * to the block layer. In each case we update the xferred
1741 * count to indicate the whole request was satisfied.
6e2a4505 1742 */
b9434c5b 1743 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1744 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1745 if (obj_request->type == OBJ_REQUEST_BIO)
1746 zero_bio_chain(obj_request->bio_list, 0);
1747 else
1748 zero_pages(obj_request->pages, 0, length);
6e2a4505 1749 obj_request->result = 0;
b9434c5b
AE
1750 } else if (xferred < length && !obj_request->result) {
1751 if (obj_request->type == OBJ_REQUEST_BIO)
1752 zero_bio_chain(obj_request->bio_list, xferred);
1753 else
1754 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1755 }
17c1cc1d 1756 obj_request->xferred = length;
6e2a4505
AE
1757 obj_request_done_set(obj_request);
1758}
1759
bf0d5f50
AE
1760static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1761{
37206ee5
AE
1762 dout("%s: obj %p cb %p\n", __func__, obj_request,
1763 obj_request->callback);
bf0d5f50
AE
1764 if (obj_request->callback)
1765 obj_request->callback(obj_request);
788e2df3
AE
1766 else
1767 complete_all(&obj_request->completion);
bf0d5f50
AE
1768}
1769
0dcc685e
ID
1770static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1771{
1772 obj_request->result = err;
1773 obj_request->xferred = 0;
1774 /*
1775 * kludge - mirror rbd_obj_request_submit() to match a put in
1776 * rbd_img_obj_callback()
1777 */
1778 if (obj_request_img_data_test(obj_request)) {
1779 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1780 rbd_img_request_get(obj_request->img_request);
1781 }
1782 obj_request_done_set(obj_request);
1783 rbd_obj_request_complete(obj_request);
1784}
1785
c47f9371 1786static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1787{
57acbaa7 1788 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1789 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1790 bool layered = false;
1791
1792 if (obj_request_img_data_test(obj_request)) {
1793 img_request = obj_request->img_request;
1794 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1795 rbd_dev = img_request->rbd_dev;
57acbaa7 1796 }
8b3e1a56
AE
1797
1798 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1799 obj_request, img_request, obj_request->result,
1800 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1801 if (layered && obj_request->result == -ENOENT &&
1802 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1803 rbd_img_parent_read(obj_request);
1804 else if (img_request)
6e2a4505
AE
1805 rbd_img_obj_request_read_callback(obj_request);
1806 else
1807 obj_request_done_set(obj_request);
bf0d5f50
AE
1808}
1809
c47f9371 1810static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1811{
1b83bef2
SW
1812 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1813 obj_request->result, obj_request->length);
1814 /*
8b3e1a56
AE
1815 * There is no such thing as a successful short write. Set
1816 * it to our originally-requested length.
1b83bef2
SW
1817 */
1818 obj_request->xferred = obj_request->length;
07741308 1819 obj_request_done_set(obj_request);
bf0d5f50
AE
1820}
1821
90e98c52
GZ
1822static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1823{
1824 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1825 obj_request->result, obj_request->length);
1826 /*
1827 * There is no such thing as a successful short discard. Set
1828 * it to our originally-requested length.
1829 */
1830 obj_request->xferred = obj_request->length;
d0265de7
JD
1831 /* discarding a non-existent object is not a problem */
1832 if (obj_request->result == -ENOENT)
1833 obj_request->result = 0;
90e98c52
GZ
1834 obj_request_done_set(obj_request);
1835}
1836
fbfab539
AE
1837/*
1838 * For a simple stat call there's nothing to do. We'll do more if
1839 * this is part of a write sequence for a layered image.
1840 */
c47f9371 1841static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1842{
37206ee5 1843 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1844 obj_request_done_set(obj_request);
1845}
1846
2761713d
ID
1847static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1848{
1849 dout("%s: obj %p\n", __func__, obj_request);
1850
1851 if (obj_request_img_data_test(obj_request))
1852 rbd_osd_copyup_callback(obj_request);
1853 else
1854 obj_request_done_set(obj_request);
1855}
1856
85e084fe 1857static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1858{
1859 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1860 u16 opcode;
1861
85e084fe 1862 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1863 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1864 if (obj_request_img_data_test(obj_request)) {
1865 rbd_assert(obj_request->img_request);
1866 rbd_assert(obj_request->which != BAD_WHICH);
1867 } else {
1868 rbd_assert(obj_request->which == BAD_WHICH);
1869 }
bf0d5f50 1870
1b83bef2
SW
1871 if (osd_req->r_result < 0)
1872 obj_request->result = osd_req->r_result;
bf0d5f50 1873
c47f9371
AE
1874 /*
1875 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1876 * passed to the block layer, which just supports a 32-bit
1877 * length field.
c47f9371 1878 */
7665d85b 1879 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1880 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1881
79528734 1882 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1883 switch (opcode) {
1884 case CEPH_OSD_OP_READ:
c47f9371 1885 rbd_osd_read_callback(obj_request);
bf0d5f50 1886 break;
0ccd5926 1887 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1888 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1889 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1890 /* fall through */
bf0d5f50 1891 case CEPH_OSD_OP_WRITE:
e30b7577 1892 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1893 rbd_osd_write_callback(obj_request);
bf0d5f50 1894 break;
fbfab539 1895 case CEPH_OSD_OP_STAT:
c47f9371 1896 rbd_osd_stat_callback(obj_request);
fbfab539 1897 break;
90e98c52
GZ
1898 case CEPH_OSD_OP_DELETE:
1899 case CEPH_OSD_OP_TRUNCATE:
1900 case CEPH_OSD_OP_ZERO:
1901 rbd_osd_discard_callback(obj_request);
1902 break;
36be9a76 1903 case CEPH_OSD_OP_CALL:
2761713d
ID
1904 rbd_osd_call_callback(obj_request);
1905 break;
bf0d5f50 1906 default:
a90bb0c1
ID
1907 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1908 obj_request->object_no, opcode);
bf0d5f50
AE
1909 break;
1910 }
1911
07741308 1912 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1913 rbd_obj_request_complete(obj_request);
1914}
1915
9d4df01f 1916static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1917{
8c042b0d 1918 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1919
7c84883a
ID
1920 rbd_assert(obj_request_img_data_test(obj_request));
1921 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1922}
1923
1924static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1925{
9d4df01f 1926 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1927
1134e091 1928 ktime_get_real_ts(&osd_req->r_mtime);
bb873b53 1929 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1930}
1931
bc81207e
ID
1932static struct ceph_osd_request *
1933__rbd_osd_req_create(struct rbd_device *rbd_dev,
1934 struct ceph_snap_context *snapc,
1935 int num_ops, unsigned int flags,
1936 struct rbd_obj_request *obj_request)
1937{
1938 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1939 struct ceph_osd_request *req;
a90bb0c1
ID
1940 const char *name_format = rbd_dev->image_format == 1 ?
1941 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e
ID
1942
1943 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1944 if (!req)
1945 return NULL;
1946
1947 req->r_flags = flags;
1948 req->r_callback = rbd_osd_req_callback;
1949 req->r_priv = obj_request;
1950
1951 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1
ID
1952 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1953 rbd_dev->header.object_prefix, obj_request->object_no))
bc81207e
ID
1954 goto err_req;
1955
1956 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1957 goto err_req;
1958
1959 return req;
1960
1961err_req:
1962 ceph_osdc_put_request(req);
1963 return NULL;
1964}
1965
0ccd5926
ID
1966/*
1967 * Create an osd request. A read request has one osd op (read).
1968 * A write request has either one (watch) or two (hint+write) osd ops.
1969 * (All rbd data writes are prefixed with an allocation hint op, but
1970 * technically osd watch is a write request, hence this distinction.)
1971 */
bf0d5f50
AE
1972static struct ceph_osd_request *rbd_osd_req_create(
1973 struct rbd_device *rbd_dev,
6d2940c8 1974 enum obj_operation_type op_type,
deb236b3 1975 unsigned int num_ops,
430c28c3 1976 struct rbd_obj_request *obj_request)
bf0d5f50 1977{
bf0d5f50 1978 struct ceph_snap_context *snapc = NULL;
bf0d5f50 1979
90e98c52
GZ
1980 if (obj_request_img_data_test(obj_request) &&
1981 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1982 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1983 if (op_type == OBJ_OP_WRITE) {
1984 rbd_assert(img_request_write_test(img_request));
1985 } else {
1986 rbd_assert(img_request_discard_test(img_request));
1987 }
6d2940c8 1988 snapc = img_request->snapc;
bf0d5f50
AE
1989 }
1990
6d2940c8 1991 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3 1992
bc81207e
ID
1993 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1994 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
54ea0046 1995 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
bf0d5f50
AE
1996}
1997
0eefd470 1998/*
d3246fb0
JD
1999 * Create a copyup osd request based on the information in the object
2000 * request supplied. A copyup request has two or three osd ops, a
2001 * copyup method call, potentially a hint op, and a write or truncate
2002 * or zero op.
0eefd470
AE
2003 */
2004static struct ceph_osd_request *
2005rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2006{
2007 struct rbd_img_request *img_request;
d3246fb0 2008 int num_osd_ops = 3;
0eefd470
AE
2009
2010 rbd_assert(obj_request_img_data_test(obj_request));
2011 img_request = obj_request->img_request;
2012 rbd_assert(img_request);
d3246fb0
JD
2013 rbd_assert(img_request_write_test(img_request) ||
2014 img_request_discard_test(img_request));
0eefd470 2015
d3246fb0
JD
2016 if (img_request_discard_test(img_request))
2017 num_osd_ops = 2;
2018
bc81207e
ID
2019 return __rbd_osd_req_create(img_request->rbd_dev,
2020 img_request->snapc, num_osd_ops,
54ea0046 2021 CEPH_OSD_FLAG_WRITE, obj_request);
0eefd470
AE
2022}
2023
bf0d5f50
AE
2024static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2025{
2026 ceph_osdc_put_request(osd_req);
2027}
2028
6c696d85
ID
2029static struct rbd_obj_request *
2030rbd_obj_request_create(enum obj_request_type type)
bf0d5f50
AE
2031{
2032 struct rbd_obj_request *obj_request;
bf0d5f50
AE
2033
2034 rbd_assert(obj_request_type_valid(type));
2035
5a60e876 2036 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 2037 if (!obj_request)
f907ad55 2038 return NULL;
f907ad55 2039
bf0d5f50
AE
2040 obj_request->which = BAD_WHICH;
2041 obj_request->type = type;
2042 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2043 init_completion(&obj_request->completion);
bf0d5f50
AE
2044 kref_init(&obj_request->kref);
2045
67e2b652 2046 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
2047 return obj_request;
2048}
2049
2050static void rbd_obj_request_destroy(struct kref *kref)
2051{
2052 struct rbd_obj_request *obj_request;
2053
2054 obj_request = container_of(kref, struct rbd_obj_request, kref);
2055
37206ee5
AE
2056 dout("%s: obj %p\n", __func__, obj_request);
2057
bf0d5f50
AE
2058 rbd_assert(obj_request->img_request == NULL);
2059 rbd_assert(obj_request->which == BAD_WHICH);
2060
2061 if (obj_request->osd_req)
2062 rbd_osd_req_destroy(obj_request->osd_req);
2063
2064 rbd_assert(obj_request_type_valid(obj_request->type));
2065 switch (obj_request->type) {
9969ebc5
AE
2066 case OBJ_REQUEST_NODATA:
2067 break; /* Nothing to do */
bf0d5f50
AE
2068 case OBJ_REQUEST_BIO:
2069 if (obj_request->bio_list)
2070 bio_chain_put(obj_request->bio_list);
2071 break;
788e2df3 2072 case OBJ_REQUEST_PAGES:
04dc923c
ID
2073 /* img_data requests don't own their page array */
2074 if (obj_request->pages &&
2075 !obj_request_img_data_test(obj_request))
788e2df3
AE
2076 ceph_release_page_vector(obj_request->pages,
2077 obj_request->page_count);
2078 break;
bf0d5f50
AE
2079 }
2080
868311b1 2081 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2082}
2083
fb65d228
AE
2084/* It's OK to call this for a device with no parent */
2085
2086static void rbd_spec_put(struct rbd_spec *spec);
2087static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2088{
2089 rbd_dev_remove_parent(rbd_dev);
2090 rbd_spec_put(rbd_dev->parent_spec);
2091 rbd_dev->parent_spec = NULL;
2092 rbd_dev->parent_overlap = 0;
2093}
2094
a2acd00e
AE
2095/*
2096 * Parent image reference counting is used to determine when an
2097 * image's parent fields can be safely torn down--after there are no
2098 * more in-flight requests to the parent image. When the last
2099 * reference is dropped, cleaning them up is safe.
2100 */
2101static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2102{
2103 int counter;
2104
2105 if (!rbd_dev->parent_spec)
2106 return;
2107
2108 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2109 if (counter > 0)
2110 return;
2111
2112 /* Last reference; clean up parent data structures */
2113
2114 if (!counter)
2115 rbd_dev_unparent(rbd_dev);
2116 else
9584d508 2117 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2118}
2119
2120/*
2121 * If an image has a non-zero parent overlap, get a reference to its
2122 * parent.
2123 *
2124 * Returns true if the rbd device has a parent with a non-zero
2125 * overlap and a reference for it was successfully taken, or
2126 * false otherwise.
2127 */
2128static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2129{
ae43e9d0 2130 int counter = 0;
a2acd00e
AE
2131
2132 if (!rbd_dev->parent_spec)
2133 return false;
2134
ae43e9d0
ID
2135 down_read(&rbd_dev->header_rwsem);
2136 if (rbd_dev->parent_overlap)
2137 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2138 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2139
2140 if (counter < 0)
9584d508 2141 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2142
ae43e9d0 2143 return counter > 0;
a2acd00e
AE
2144}
2145
bf0d5f50
AE
2146/*
2147 * Caller is responsible for filling in the list of object requests
2148 * that comprises the image request, and the Linux request pointer
2149 * (if there is one).
2150 */
cc344fa1
AE
2151static struct rbd_img_request *rbd_img_request_create(
2152 struct rbd_device *rbd_dev,
bf0d5f50 2153 u64 offset, u64 length,
6d2940c8 2154 enum obj_operation_type op_type,
4e752f0a 2155 struct ceph_snap_context *snapc)
bf0d5f50
AE
2156{
2157 struct rbd_img_request *img_request;
bf0d5f50 2158
7a716aac 2159 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2160 if (!img_request)
2161 return NULL;
2162
bf0d5f50
AE
2163 img_request->rq = NULL;
2164 img_request->rbd_dev = rbd_dev;
2165 img_request->offset = offset;
2166 img_request->length = length;
0c425248 2167 img_request->flags = 0;
90e98c52
GZ
2168 if (op_type == OBJ_OP_DISCARD) {
2169 img_request_discard_set(img_request);
2170 img_request->snapc = snapc;
2171 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2172 img_request_write_set(img_request);
4e752f0a 2173 img_request->snapc = snapc;
0c425248 2174 } else {
bf0d5f50 2175 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2176 }
a2acd00e 2177 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2178 img_request_layered_set(img_request);
bf0d5f50
AE
2179 spin_lock_init(&img_request->completion_lock);
2180 img_request->next_completion = 0;
2181 img_request->callback = NULL;
a5a337d4 2182 img_request->result = 0;
bf0d5f50
AE
2183 img_request->obj_request_count = 0;
2184 INIT_LIST_HEAD(&img_request->obj_requests);
2185 kref_init(&img_request->kref);
2186
37206ee5 2187 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2188 obj_op_name(op_type), offset, length, img_request);
37206ee5 2189
bf0d5f50
AE
2190 return img_request;
2191}
2192
2193static void rbd_img_request_destroy(struct kref *kref)
2194{
2195 struct rbd_img_request *img_request;
2196 struct rbd_obj_request *obj_request;
2197 struct rbd_obj_request *next_obj_request;
2198
2199 img_request = container_of(kref, struct rbd_img_request, kref);
2200
37206ee5
AE
2201 dout("%s: img %p\n", __func__, img_request);
2202
bf0d5f50
AE
2203 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2204 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2205 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2206
a2acd00e
AE
2207 if (img_request_layered_test(img_request)) {
2208 img_request_layered_clear(img_request);
2209 rbd_dev_parent_put(img_request->rbd_dev);
2210 }
2211
bef95455
JD
2212 if (img_request_write_test(img_request) ||
2213 img_request_discard_test(img_request))
812164f8 2214 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2215
1c2a9dfe 2216 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2217}
2218
e93f3152
AE
2219static struct rbd_img_request *rbd_parent_request_create(
2220 struct rbd_obj_request *obj_request,
2221 u64 img_offset, u64 length)
2222{
2223 struct rbd_img_request *parent_request;
2224 struct rbd_device *rbd_dev;
2225
2226 rbd_assert(obj_request->img_request);
2227 rbd_dev = obj_request->img_request->rbd_dev;
2228
4e752f0a 2229 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2230 length, OBJ_OP_READ, NULL);
e93f3152
AE
2231 if (!parent_request)
2232 return NULL;
2233
2234 img_request_child_set(parent_request);
2235 rbd_obj_request_get(obj_request);
2236 parent_request->obj_request = obj_request;
2237
2238 return parent_request;
2239}
2240
2241static void rbd_parent_request_destroy(struct kref *kref)
2242{
2243 struct rbd_img_request *parent_request;
2244 struct rbd_obj_request *orig_request;
2245
2246 parent_request = container_of(kref, struct rbd_img_request, kref);
2247 orig_request = parent_request->obj_request;
2248
2249 parent_request->obj_request = NULL;
2250 rbd_obj_request_put(orig_request);
2251 img_request_child_clear(parent_request);
2252
2253 rbd_img_request_destroy(kref);
2254}
2255
1217857f
AE
2256static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2257{
6365d33a 2258 struct rbd_img_request *img_request;
1217857f
AE
2259 unsigned int xferred;
2260 int result;
8b3e1a56 2261 bool more;
1217857f 2262
6365d33a
AE
2263 rbd_assert(obj_request_img_data_test(obj_request));
2264 img_request = obj_request->img_request;
2265
1217857f
AE
2266 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2267 xferred = (unsigned int)obj_request->xferred;
2268 result = obj_request->result;
2269 if (result) {
2270 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2271 enum obj_operation_type op_type;
2272
90e98c52
GZ
2273 if (img_request_discard_test(img_request))
2274 op_type = OBJ_OP_DISCARD;
2275 else if (img_request_write_test(img_request))
2276 op_type = OBJ_OP_WRITE;
2277 else
2278 op_type = OBJ_OP_READ;
1217857f 2279
9584d508 2280 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2281 obj_op_name(op_type), obj_request->length,
2282 obj_request->img_offset, obj_request->offset);
9584d508 2283 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2284 result, xferred);
2285 if (!img_request->result)
2286 img_request->result = result;
082a75da
ID
2287 /*
2288 * Need to end I/O on the entire obj_request worth of
2289 * bytes in case of error.
2290 */
2291 xferred = obj_request->length;
1217857f
AE
2292 }
2293
8b3e1a56
AE
2294 if (img_request_child_test(img_request)) {
2295 rbd_assert(img_request->obj_request != NULL);
2296 more = obj_request->which < img_request->obj_request_count - 1;
2297 } else {
2a842aca
CH
2298 blk_status_t status = errno_to_blk_status(result);
2299
8b3e1a56 2300 rbd_assert(img_request->rq != NULL);
7ad18afa 2301
2a842aca 2302 more = blk_update_request(img_request->rq, status, xferred);
7ad18afa 2303 if (!more)
2a842aca 2304 __blk_mq_end_request(img_request->rq, status);
8b3e1a56
AE
2305 }
2306
2307 return more;
1217857f
AE
2308}
2309
2169238d
AE
2310static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2311{
2312 struct rbd_img_request *img_request;
2313 u32 which = obj_request->which;
2314 bool more = true;
2315
6365d33a 2316 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2317 img_request = obj_request->img_request;
2318
2319 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2320 rbd_assert(img_request != NULL);
2169238d
AE
2321 rbd_assert(img_request->obj_request_count > 0);
2322 rbd_assert(which != BAD_WHICH);
2323 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2324
2325 spin_lock_irq(&img_request->completion_lock);
2326 if (which != img_request->next_completion)
2327 goto out;
2328
2329 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2330 rbd_assert(more);
2331 rbd_assert(which < img_request->obj_request_count);
2332
2333 if (!obj_request_done_test(obj_request))
2334 break;
1217857f 2335 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2336 which++;
2337 }
2338
2339 rbd_assert(more ^ (which == img_request->obj_request_count));
2340 img_request->next_completion = which;
2341out:
2342 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2343 rbd_img_request_put(img_request);
2169238d
AE
2344
2345 if (!more)
2346 rbd_img_request_complete(img_request);
2347}
2348
3b434a2a
JD
2349/*
2350 * Add individual osd ops to the given ceph_osd_request and prepare
2351 * them for submission. num_ops is the current number of
2352 * osd operations already to the object request.
2353 */
2354static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2355 struct ceph_osd_request *osd_request,
2356 enum obj_operation_type op_type,
2357 unsigned int num_ops)
2358{
2359 struct rbd_img_request *img_request = obj_request->img_request;
2360 struct rbd_device *rbd_dev = img_request->rbd_dev;
2361 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2362 u64 offset = obj_request->offset;
2363 u64 length = obj_request->length;
2364 u64 img_end;
2365 u16 opcode;
2366
2367 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2368 if (!offset && length == object_size &&
2369 (!img_request_layered_test(img_request) ||
2370 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2371 opcode = CEPH_OSD_OP_DELETE;
2372 } else if ((offset + length == object_size)) {
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2374 } else {
2375 down_read(&rbd_dev->header_rwsem);
2376 img_end = rbd_dev->header.image_size;
2377 up_read(&rbd_dev->header_rwsem);
2378
2379 if (obj_request->img_offset + length == img_end)
2380 opcode = CEPH_OSD_OP_TRUNCATE;
2381 else
2382 opcode = CEPH_OSD_OP_ZERO;
2383 }
2384 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2385 if (!offset && length == object_size)
2386 opcode = CEPH_OSD_OP_WRITEFULL;
2387 else
2388 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2389 osd_req_op_alloc_hint_init(osd_request, num_ops,
2390 object_size, object_size);
2391 num_ops++;
2392 } else {
2393 opcode = CEPH_OSD_OP_READ;
2394 }
2395
7e868b6e 2396 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2397 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2398 else
2399 osd_req_op_extent_init(osd_request, num_ops, opcode,
2400 offset, length, 0, 0);
2401
3b434a2a
JD
2402 if (obj_request->type == OBJ_REQUEST_BIO)
2403 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2404 obj_request->bio_list, length);
2405 else if (obj_request->type == OBJ_REQUEST_PAGES)
2406 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2407 obj_request->pages, length,
2408 offset & ~PAGE_MASK, false, false);
2409
2410 /* Discards are also writes */
2411 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2412 rbd_osd_req_format_write(obj_request);
2413 else
2414 rbd_osd_req_format_read(obj_request);
2415}
2416
f1a4739f
AE
2417/*
2418 * Split up an image request into one or more object requests, each
2419 * to a different object. The "type" parameter indicates whether
2420 * "data_desc" is the pointer to the head of a list of bio
2421 * structures, or the base of a page array. In either case this
2422 * function assumes data_desc describes memory sufficient to hold
2423 * all data described by the image request.
2424 */
2425static int rbd_img_request_fill(struct rbd_img_request *img_request,
2426 enum obj_request_type type,
2427 void *data_desc)
bf0d5f50
AE
2428{
2429 struct rbd_device *rbd_dev = img_request->rbd_dev;
2430 struct rbd_obj_request *obj_request = NULL;
2431 struct rbd_obj_request *next_obj_request;
a158073c 2432 struct bio *bio_list = NULL;
f1a4739f 2433 unsigned int bio_offset = 0;
a158073c 2434 struct page **pages = NULL;
6d2940c8 2435 enum obj_operation_type op_type;
7da22d29 2436 u64 img_offset;
bf0d5f50 2437 u64 resid;
bf0d5f50 2438
f1a4739f
AE
2439 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2440 (int)type, data_desc);
37206ee5 2441
7da22d29 2442 img_offset = img_request->offset;
bf0d5f50 2443 resid = img_request->length;
4dda41d3 2444 rbd_assert(resid > 0);
3b434a2a 2445 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2446
2447 if (type == OBJ_REQUEST_BIO) {
2448 bio_list = data_desc;
4f024f37
KO
2449 rbd_assert(img_offset ==
2450 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2451 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2452 pages = data_desc;
2453 }
2454
bf0d5f50 2455 while (resid) {
2fa12320 2456 struct ceph_osd_request *osd_req;
a90bb0c1 2457 u64 object_no = img_offset >> rbd_dev->header.obj_order;
67e2b652
ID
2458 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2459 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2460
6c696d85 2461 obj_request = rbd_obj_request_create(type);
bf0d5f50
AE
2462 if (!obj_request)
2463 goto out_unwind;
62054da6 2464
a90bb0c1 2465 obj_request->object_no = object_no;
67e2b652
ID
2466 obj_request->offset = offset;
2467 obj_request->length = length;
2468
03507db6
JD
2469 /*
2470 * set obj_request->img_request before creating the
2471 * osd_request so that it gets the right snapc
2472 */
2473 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2474
f1a4739f
AE
2475 if (type == OBJ_REQUEST_BIO) {
2476 unsigned int clone_size;
2477
2478 rbd_assert(length <= (u64)UINT_MAX);
2479 clone_size = (unsigned int)length;
2480 obj_request->bio_list =
2481 bio_chain_clone_range(&bio_list,
2482 &bio_offset,
2483 clone_size,
2224d879 2484 GFP_NOIO);
f1a4739f 2485 if (!obj_request->bio_list)
62054da6 2486 goto out_unwind;
90e98c52 2487 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2488 unsigned int page_count;
2489
2490 obj_request->pages = pages;
2491 page_count = (u32)calc_pages_for(offset, length);
2492 obj_request->page_count = page_count;
2493 if ((offset + length) & ~PAGE_MASK)
2494 page_count--; /* more on last page */
2495 pages += page_count;
2496 }
bf0d5f50 2497
6d2940c8
GZ
2498 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2499 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2500 obj_request);
2fa12320 2501 if (!osd_req)
62054da6 2502 goto out_unwind;
3b434a2a 2503
2fa12320 2504 obj_request->osd_req = osd_req;
2169238d 2505 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2506 obj_request->img_offset = img_offset;
9d4df01f 2507
3b434a2a 2508 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2509
7da22d29 2510 img_offset += length;
bf0d5f50
AE
2511 resid -= length;
2512 }
2513
2514 return 0;
2515
bf0d5f50
AE
2516out_unwind:
2517 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2518 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2519
2520 return -ENOMEM;
2521}
2522
0eefd470 2523static void
2761713d 2524rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2525{
2526 struct rbd_img_request *img_request;
2527 struct rbd_device *rbd_dev;
ebda6408 2528 struct page **pages;
0eefd470
AE
2529 u32 page_count;
2530
2761713d
ID
2531 dout("%s: obj %p\n", __func__, obj_request);
2532
d3246fb0
JD
2533 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2534 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2535 rbd_assert(obj_request_img_data_test(obj_request));
2536 img_request = obj_request->img_request;
2537 rbd_assert(img_request);
2538
2539 rbd_dev = img_request->rbd_dev;
2540 rbd_assert(rbd_dev);
0eefd470 2541
ebda6408
AE
2542 pages = obj_request->copyup_pages;
2543 rbd_assert(pages != NULL);
0eefd470 2544 obj_request->copyup_pages = NULL;
ebda6408
AE
2545 page_count = obj_request->copyup_page_count;
2546 rbd_assert(page_count);
2547 obj_request->copyup_page_count = 0;
2548 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2549
2550 /*
2551 * We want the transfer count to reflect the size of the
2552 * original write request. There is no such thing as a
2553 * successful short write, so if the request was successful
2554 * we can just set it to the originally-requested length.
2555 */
2556 if (!obj_request->result)
2557 obj_request->xferred = obj_request->length;
2558
2761713d 2559 obj_request_done_set(obj_request);
0eefd470
AE
2560}
2561
3d7efd18
AE
2562static void
2563rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2564{
2565 struct rbd_obj_request *orig_request;
0eefd470 2566 struct ceph_osd_request *osd_req;
0eefd470 2567 struct rbd_device *rbd_dev;
3d7efd18 2568 struct page **pages;
d3246fb0 2569 enum obj_operation_type op_type;
ebda6408 2570 u32 page_count;
bbea1c1a 2571 int img_result;
ebda6408 2572 u64 parent_length;
3d7efd18
AE
2573
2574 rbd_assert(img_request_child_test(img_request));
2575
2576 /* First get what we need from the image request */
2577
2578 pages = img_request->copyup_pages;
2579 rbd_assert(pages != NULL);
2580 img_request->copyup_pages = NULL;
ebda6408
AE
2581 page_count = img_request->copyup_page_count;
2582 rbd_assert(page_count);
2583 img_request->copyup_page_count = 0;
3d7efd18
AE
2584
2585 orig_request = img_request->obj_request;
2586 rbd_assert(orig_request != NULL);
b91f09f1 2587 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2588 img_result = img_request->result;
ebda6408 2589 parent_length = img_request->length;
fa355112 2590 rbd_assert(img_result || parent_length == img_request->xferred);
91c6febb 2591 rbd_img_request_put(img_request);
3d7efd18 2592
91c6febb
AE
2593 rbd_assert(orig_request->img_request);
2594 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2595 rbd_assert(rbd_dev);
0eefd470 2596
bbea1c1a
AE
2597 /*
2598 * If the overlap has become 0 (most likely because the
2599 * image has been flattened) we need to free the pages
2600 * and re-submit the original write request.
2601 */
2602 if (!rbd_dev->parent_overlap) {
bbea1c1a 2603 ceph_release_page_vector(pages, page_count);
980917fc
ID
2604 rbd_obj_request_submit(orig_request);
2605 return;
bbea1c1a 2606 }
0eefd470 2607
bbea1c1a 2608 if (img_result)
0eefd470 2609 goto out_err;
0eefd470 2610
8785b1d4
AE
2611 /*
2612 * The original osd request is of no use to use any more.
0ccd5926 2613 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2614 * request. Allocate the new copyup osd request for the
2615 * original request, and release the old one.
2616 */
bbea1c1a 2617 img_result = -ENOMEM;
0eefd470
AE
2618 osd_req = rbd_osd_req_create_copyup(orig_request);
2619 if (!osd_req)
2620 goto out_err;
8785b1d4 2621 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2622 orig_request->osd_req = osd_req;
2623 orig_request->copyup_pages = pages;
ebda6408 2624 orig_request->copyup_page_count = page_count;
3d7efd18 2625
0eefd470 2626 /* Initialize the copyup op */
3d7efd18 2627
0eefd470 2628 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2629 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2630 false, false);
3d7efd18 2631
d3246fb0 2632 /* Add the other op(s) */
0eefd470 2633
d3246fb0
JD
2634 op_type = rbd_img_request_op_type(orig_request->img_request);
2635 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2636
2637 /* All set, send it off. */
2638
980917fc
ID
2639 rbd_obj_request_submit(orig_request);
2640 return;
0eefd470 2641
0eefd470 2642out_err:
fa355112 2643 ceph_release_page_vector(pages, page_count);
0dcc685e 2644 rbd_obj_request_error(orig_request, img_result);
3d7efd18
AE
2645}
2646
2647/*
2648 * Read from the parent image the range of data that covers the
2649 * entire target of the given object request. This is used for
2650 * satisfying a layered image write request when the target of an
2651 * object request from the image request does not exist.
2652 *
2653 * A page array big enough to hold the returned data is allocated
2654 * and supplied to rbd_img_request_fill() as the "data descriptor."
2655 * When the read completes, this page array will be transferred to
2656 * the original object request for the copyup operation.
2657 *
c2e82414
ID
2658 * If an error occurs, it is recorded as the result of the original
2659 * object request in rbd_img_obj_exists_callback().
3d7efd18
AE
2660 */
2661static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2662{
058aa991 2663 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2664 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2665 u64 img_offset;
2666 u64 length;
2667 struct page **pages = NULL;
2668 u32 page_count;
2669 int result;
2670
3d7efd18
AE
2671 rbd_assert(rbd_dev->parent != NULL);
2672
2673 /*
2674 * Determine the byte range covered by the object in the
2675 * child image to which the original request was to be sent.
2676 */
2677 img_offset = obj_request->img_offset - obj_request->offset;
5bc3fb17 2678 length = rbd_obj_bytes(&rbd_dev->header);
3d7efd18 2679
a9e8ba2c
AE
2680 /*
2681 * There is no defined parent data beyond the parent
2682 * overlap, so limit what we read at that boundary if
2683 * necessary.
2684 */
2685 if (img_offset + length > rbd_dev->parent_overlap) {
2686 rbd_assert(img_offset < rbd_dev->parent_overlap);
2687 length = rbd_dev->parent_overlap - img_offset;
2688 }
2689
3d7efd18
AE
2690 /*
2691 * Allocate a page array big enough to receive the data read
2692 * from the parent.
2693 */
2694 page_count = (u32)calc_pages_for(0, length);
2695 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2696 if (IS_ERR(pages)) {
2697 result = PTR_ERR(pages);
2698 pages = NULL;
2699 goto out_err;
2700 }
2701
2702 result = -ENOMEM;
e93f3152
AE
2703 parent_request = rbd_parent_request_create(obj_request,
2704 img_offset, length);
3d7efd18
AE
2705 if (!parent_request)
2706 goto out_err;
3d7efd18
AE
2707
2708 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2709 if (result)
2710 goto out_err;
058aa991 2711
3d7efd18 2712 parent_request->copyup_pages = pages;
ebda6408 2713 parent_request->copyup_page_count = page_count;
3d7efd18 2714 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2715
3d7efd18
AE
2716 result = rbd_img_request_submit(parent_request);
2717 if (!result)
2718 return 0;
2719
2720 parent_request->copyup_pages = NULL;
ebda6408 2721 parent_request->copyup_page_count = 0;
3d7efd18
AE
2722 parent_request->obj_request = NULL;
2723 rbd_obj_request_put(obj_request);
2724out_err:
2725 if (pages)
2726 ceph_release_page_vector(pages, page_count);
2727 if (parent_request)
2728 rbd_img_request_put(parent_request);
3d7efd18
AE
2729 return result;
2730}
2731
c5b5ef6c
AE
2732static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2733{
c5b5ef6c 2734 struct rbd_obj_request *orig_request;
638f5abe 2735 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2736 int result;
2737
2738 rbd_assert(!obj_request_img_data_test(obj_request));
2739
2740 /*
2741 * All we need from the object request is the original
2742 * request and the result of the STAT op. Grab those, then
2743 * we're done with the request.
2744 */
2745 orig_request = obj_request->obj_request;
2746 obj_request->obj_request = NULL;
912c317d 2747 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2748 rbd_assert(orig_request);
2749 rbd_assert(orig_request->img_request);
2750
2751 result = obj_request->result;
2752 obj_request->result = 0;
2753
2754 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755 obj_request, orig_request, result,
2756 obj_request->xferred, obj_request->length);
2757 rbd_obj_request_put(obj_request);
2758
638f5abe
AE
2759 /*
2760 * If the overlap has become 0 (most likely because the
980917fc
ID
2761 * image has been flattened) we need to re-submit the
2762 * original request.
638f5abe
AE
2763 */
2764 rbd_dev = orig_request->img_request->rbd_dev;
2765 if (!rbd_dev->parent_overlap) {
980917fc
ID
2766 rbd_obj_request_submit(orig_request);
2767 return;
638f5abe 2768 }
c5b5ef6c
AE
2769
2770 /*
2771 * Our only purpose here is to determine whether the object
2772 * exists, and we don't want to treat the non-existence as
2773 * an error. If something else comes back, transfer the
2774 * error to the original request and complete it now.
2775 */
2776 if (!result) {
2777 obj_request_existence_set(orig_request, true);
2778 } else if (result == -ENOENT) {
2779 obj_request_existence_set(orig_request, false);
c2e82414
ID
2780 } else {
2781 goto fail_orig_request;
c5b5ef6c
AE
2782 }
2783
2784 /*
2785 * Resubmit the original request now that we have recorded
2786 * whether the target object exists.
2787 */
c2e82414
ID
2788 result = rbd_img_obj_request_submit(orig_request);
2789 if (result)
2790 goto fail_orig_request;
2791
2792 return;
2793
2794fail_orig_request:
0dcc685e 2795 rbd_obj_request_error(orig_request, result);
c5b5ef6c
AE
2796}
2797
2798static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2799{
058aa991 2800 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2801 struct rbd_obj_request *stat_request;
710214e3 2802 struct page **pages;
c5b5ef6c
AE
2803 u32 page_count;
2804 size_t size;
2805 int ret;
2806
6c696d85 2807 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
710214e3
ID
2808 if (!stat_request)
2809 return -ENOMEM;
2810
a90bb0c1
ID
2811 stat_request->object_no = obj_request->object_no;
2812
710214e3
ID
2813 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2814 stat_request);
2815 if (!stat_request->osd_req) {
2816 ret = -ENOMEM;
2817 goto fail_stat_request;
2818 }
2819
c5b5ef6c
AE
2820 /*
2821 * The response data for a STAT call consists of:
2822 * le64 length;
2823 * struct {
2824 * le32 tv_sec;
2825 * le32 tv_nsec;
2826 * } mtime;
2827 */
2828 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829 page_count = (u32)calc_pages_for(0, size);
2830 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
710214e3
ID
2831 if (IS_ERR(pages)) {
2832 ret = PTR_ERR(pages);
2833 goto fail_stat_request;
2834 }
c5b5ef6c 2835
710214e3
ID
2836 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2837 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2838 false, false);
c5b5ef6c
AE
2839
2840 rbd_obj_request_get(obj_request);
2841 stat_request->obj_request = obj_request;
2842 stat_request->pages = pages;
2843 stat_request->page_count = page_count;
c5b5ef6c
AE
2844 stat_request->callback = rbd_img_obj_exists_callback;
2845
980917fc
ID
2846 rbd_obj_request_submit(stat_request);
2847 return 0;
c5b5ef6c 2848
710214e3
ID
2849fail_stat_request:
2850 rbd_obj_request_put(stat_request);
c5b5ef6c
AE
2851 return ret;
2852}
2853
70d045f6 2854static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2855{
058aa991
ID
2856 struct rbd_img_request *img_request = obj_request->img_request;
2857 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2858
70d045f6 2859 /* Reads */
1c220881
JD
2860 if (!img_request_write_test(img_request) &&
2861 !img_request_discard_test(img_request))
70d045f6
ID
2862 return true;
2863
2864 /* Non-layered writes */
2865 if (!img_request_layered_test(img_request))
2866 return true;
2867
b454e36d 2868 /*
70d045f6
ID
2869 * Layered writes outside of the parent overlap range don't
2870 * share any data with the parent.
b454e36d 2871 */
70d045f6
ID
2872 if (!obj_request_overlaps_parent(obj_request))
2873 return true;
b454e36d 2874
c622d226
GZ
2875 /*
2876 * Entire-object layered writes - we will overwrite whatever
2877 * parent data there is anyway.
2878 */
2879 if (!obj_request->offset &&
2880 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2881 return true;
2882
70d045f6
ID
2883 /*
2884 * If the object is known to already exist, its parent data has
2885 * already been copied.
2886 */
2887 if (obj_request_known_test(obj_request) &&
2888 obj_request_exists_test(obj_request))
2889 return true;
2890
2891 return false;
2892}
2893
2894static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2895{
058aa991
ID
2896 rbd_assert(obj_request_img_data_test(obj_request));
2897 rbd_assert(obj_request_type_valid(obj_request->type));
2898 rbd_assert(obj_request->img_request);
b454e36d 2899
70d045f6 2900 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2901 rbd_obj_request_submit(obj_request);
2902 return 0;
b454e36d
AE
2903 }
2904
2905 /*
3d7efd18
AE
2906 * It's a layered write. The target object might exist but
2907 * we may not know that yet. If we know it doesn't exist,
2908 * start by reading the data for the full target object from
2909 * the parent so we can use it for a copyup to the target.
b454e36d 2910 */
70d045f6 2911 if (obj_request_known_test(obj_request))
3d7efd18
AE
2912 return rbd_img_obj_parent_read_full(obj_request);
2913
2914 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2915
2916 return rbd_img_obj_exists_submit(obj_request);
2917}
2918
bf0d5f50
AE
2919static int rbd_img_request_submit(struct rbd_img_request *img_request)
2920{
bf0d5f50 2921 struct rbd_obj_request *obj_request;
46faeed4 2922 struct rbd_obj_request *next_obj_request;
663ae2cc 2923 int ret = 0;
bf0d5f50 2924
37206ee5 2925 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2926
663ae2cc
ID
2927 rbd_img_request_get(img_request);
2928 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 2929 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 2930 if (ret)
663ae2cc 2931 goto out_put_ireq;
bf0d5f50
AE
2932 }
2933
663ae2cc
ID
2934out_put_ireq:
2935 rbd_img_request_put(img_request);
2936 return ret;
bf0d5f50 2937}
8b3e1a56
AE
2938
2939static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2940{
2941 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2942 struct rbd_device *rbd_dev;
2943 u64 obj_end;
02c74fba
AE
2944 u64 img_xferred;
2945 int img_result;
8b3e1a56
AE
2946
2947 rbd_assert(img_request_child_test(img_request));
2948
02c74fba
AE
2949 /* First get what we need from the image request and release it */
2950
8b3e1a56 2951 obj_request = img_request->obj_request;
02c74fba
AE
2952 img_xferred = img_request->xferred;
2953 img_result = img_request->result;
2954 rbd_img_request_put(img_request);
2955
2956 /*
2957 * If the overlap has become 0 (most likely because the
2958 * image has been flattened) we need to re-submit the
2959 * original request.
2960 */
a9e8ba2c
AE
2961 rbd_assert(obj_request);
2962 rbd_assert(obj_request->img_request);
02c74fba
AE
2963 rbd_dev = obj_request->img_request->rbd_dev;
2964 if (!rbd_dev->parent_overlap) {
980917fc
ID
2965 rbd_obj_request_submit(obj_request);
2966 return;
02c74fba 2967 }
a9e8ba2c 2968
02c74fba 2969 obj_request->result = img_result;
a9e8ba2c
AE
2970 if (obj_request->result)
2971 goto out;
2972
2973 /*
2974 * We need to zero anything beyond the parent overlap
2975 * boundary. Since rbd_img_obj_request_read_callback()
2976 * will zero anything beyond the end of a short read, an
2977 * easy way to do this is to pretend the data from the
2978 * parent came up short--ending at the overlap boundary.
2979 */
2980 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2981 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2982 if (obj_end > rbd_dev->parent_overlap) {
2983 u64 xferred = 0;
2984
2985 if (obj_request->img_offset < rbd_dev->parent_overlap)
2986 xferred = rbd_dev->parent_overlap -
2987 obj_request->img_offset;
8b3e1a56 2988
02c74fba 2989 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2990 } else {
02c74fba 2991 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2992 }
2993out:
8b3e1a56
AE
2994 rbd_img_obj_request_read_callback(obj_request);
2995 rbd_obj_request_complete(obj_request);
2996}
2997
2998static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2999{
8b3e1a56
AE
3000 struct rbd_img_request *img_request;
3001 int result;
3002
3003 rbd_assert(obj_request_img_data_test(obj_request));
3004 rbd_assert(obj_request->img_request != NULL);
3005 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3006 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3007
8b3e1a56 3008 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3009 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3010 obj_request->img_offset,
e93f3152 3011 obj_request->length);
8b3e1a56
AE
3012 result = -ENOMEM;
3013 if (!img_request)
3014 goto out_err;
3015
5b2ab72d
AE
3016 if (obj_request->type == OBJ_REQUEST_BIO)
3017 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3018 obj_request->bio_list);
3019 else
3020 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3021 obj_request->pages);
8b3e1a56
AE
3022 if (result)
3023 goto out_err;
3024
3025 img_request->callback = rbd_img_parent_read_callback;
3026 result = rbd_img_request_submit(img_request);
3027 if (result)
3028 goto out_err;
3029
3030 return;
3031out_err:
3032 if (img_request)
3033 rbd_img_request_put(img_request);
3034 obj_request->result = result;
3035 obj_request->xferred = 0;
3036 obj_request_done_set(obj_request);
3037}
bf0d5f50 3038
ed95b21a 3039static const struct rbd_client_id rbd_empty_cid;
b8d70035 3040
ed95b21a
ID
3041static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3042 const struct rbd_client_id *rhs)
3043{
3044 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3045}
3046
3047static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3048{
3049 struct rbd_client_id cid;
3050
3051 mutex_lock(&rbd_dev->watch_mutex);
3052 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3053 cid.handle = rbd_dev->watch_cookie;
3054 mutex_unlock(&rbd_dev->watch_mutex);
3055 return cid;
3056}
3057
3058/*
3059 * lock_rwsem must be held for write
3060 */
3061static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3062 const struct rbd_client_id *cid)
3063{
3064 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3065 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3066 cid->gid, cid->handle);
3067 rbd_dev->owner_cid = *cid; /* struct */
3068}
3069
3070static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3071{
3072 mutex_lock(&rbd_dev->watch_mutex);
3073 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3074 mutex_unlock(&rbd_dev->watch_mutex);
3075}
3076
3077/*
3078 * lock_rwsem must be held for write
3079 */
3080static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 3081{
922dab61 3082 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3083 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3084 char cookie[32];
e627db08 3085 int ret;
b8d70035 3086
cbbfb0ff
ID
3087 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3088 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 3089
ed95b21a
ID
3090 format_lock_cookie(rbd_dev, cookie);
3091 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3092 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3093 RBD_LOCK_TAG, "", 0);
e627db08 3094 if (ret)
ed95b21a 3095 return ret;
b8d70035 3096
ed95b21a 3097 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
cbbfb0ff 3098 strcpy(rbd_dev->lock_cookie, cookie);
ed95b21a
ID
3099 rbd_set_owner_cid(rbd_dev, &cid);
3100 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3101 return 0;
b8d70035
AE
3102}
3103
ed95b21a
ID
3104/*
3105 * lock_rwsem must be held for write
3106 */
bbead745 3107static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 3108{
922dab61 3109 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
3110 int ret;
3111
cbbfb0ff
ID
3112 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3113 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 3114
ed95b21a 3115 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 3116 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
3117 if (ret && ret != -ENOENT)
3118 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 3119
bbead745
ID
3120 /* treat errors as the image is unlocked */
3121 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 3122 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
3123 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3124 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
3125}
3126
ed95b21a
ID
3127static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3128 enum rbd_notify_op notify_op,
3129 struct page ***preply_pages,
3130 size_t *preply_len)
9969ebc5
AE
3131{
3132 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3133 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3134 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3135 char buf[buf_size];
3136 void *p = buf;
9969ebc5 3137
ed95b21a 3138 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 3139
ed95b21a
ID
3140 /* encode *LockPayload NotifyMessage (op + ClientId) */
3141 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3142 ceph_encode_32(&p, notify_op);
3143 ceph_encode_64(&p, cid.gid);
3144 ceph_encode_64(&p, cid.handle);
8eb87565 3145
ed95b21a
ID
3146 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3147 &rbd_dev->header_oloc, buf, buf_size,
3148 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
3149}
3150
ed95b21a
ID
3151static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3152 enum rbd_notify_op notify_op)
b30a01f2 3153{
ed95b21a
ID
3154 struct page **reply_pages;
3155 size_t reply_len;
b30a01f2 3156
ed95b21a
ID
3157 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3158 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3159}
b30a01f2 3160
ed95b21a
ID
3161static void rbd_notify_acquired_lock(struct work_struct *work)
3162{
3163 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3164 acquired_lock_work);
76756a51 3165
ed95b21a 3166 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
3167}
3168
ed95b21a 3169static void rbd_notify_released_lock(struct work_struct *work)
c525f036 3170{
ed95b21a
ID
3171 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3172 released_lock_work);
811c6688 3173
ed95b21a 3174 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
3175}
3176
ed95b21a 3177static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 3178{
ed95b21a
ID
3179 struct page **reply_pages;
3180 size_t reply_len;
3181 bool lock_owner_responded = false;
36be9a76
AE
3182 int ret;
3183
ed95b21a 3184 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 3185
ed95b21a
ID
3186 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3187 &reply_pages, &reply_len);
3188 if (ret && ret != -ETIMEDOUT) {
3189 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 3190 goto out;
ed95b21a 3191 }
36be9a76 3192
ed95b21a
ID
3193 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3194 void *p = page_address(reply_pages[0]);
3195 void *const end = p + reply_len;
3196 u32 n;
36be9a76 3197
ed95b21a
ID
3198 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3199 while (n--) {
3200 u8 struct_v;
3201 u32 len;
36be9a76 3202
ed95b21a
ID
3203 ceph_decode_need(&p, end, 8 + 8, e_inval);
3204 p += 8 + 8; /* skip gid and cookie */
04017e29 3205
ed95b21a
ID
3206 ceph_decode_32_safe(&p, end, len, e_inval);
3207 if (!len)
3208 continue;
3209
3210 if (lock_owner_responded) {
3211 rbd_warn(rbd_dev,
3212 "duplicate lock owners detected");
3213 ret = -EIO;
3214 goto out;
3215 }
3216
3217 lock_owner_responded = true;
3218 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3219 &struct_v, &len);
3220 if (ret) {
3221 rbd_warn(rbd_dev,
3222 "failed to decode ResponseMessage: %d",
3223 ret);
3224 goto e_inval;
3225 }
3226
3227 ret = ceph_decode_32(&p);
3228 }
3229 }
3230
3231 if (!lock_owner_responded) {
3232 rbd_warn(rbd_dev, "no lock owners detected");
3233 ret = -ETIMEDOUT;
3234 }
3235
3236out:
3237 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3238 return ret;
3239
3240e_inval:
3241 ret = -EINVAL;
3242 goto out;
3243}
3244
3245static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3246{
3247 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3248
3249 cancel_delayed_work(&rbd_dev->lock_dwork);
3250 if (wake_all)
3251 wake_up_all(&rbd_dev->lock_waitq);
3252 else
3253 wake_up(&rbd_dev->lock_waitq);
3254}
3255
3256static int get_lock_owner_info(struct rbd_device *rbd_dev,
3257 struct ceph_locker **lockers, u32 *num_lockers)
3258{
3259 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3260 u8 lock_type;
3261 char *lock_tag;
3262 int ret;
3263
3264 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3265
3266 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3267 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3268 &lock_type, &lock_tag, lockers, num_lockers);
3269 if (ret)
3270 return ret;
3271
3272 if (*num_lockers == 0) {
3273 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3274 goto out;
3275 }
3276
3277 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3278 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3279 lock_tag);
3280 ret = -EBUSY;
3281 goto out;
3282 }
3283
3284 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3285 rbd_warn(rbd_dev, "shared lock type detected");
3286 ret = -EBUSY;
3287 goto out;
3288 }
3289
3290 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3291 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3292 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3293 (*lockers)[0].id.cookie);
3294 ret = -EBUSY;
3295 goto out;
3296 }
3297
3298out:
3299 kfree(lock_tag);
3300 return ret;
3301}
3302
3303static int find_watcher(struct rbd_device *rbd_dev,
3304 const struct ceph_locker *locker)
3305{
3306 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3307 struct ceph_watch_item *watchers;
3308 u32 num_watchers;
3309 u64 cookie;
3310 int i;
3311 int ret;
3312
3313 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3314 &rbd_dev->header_oloc, &watchers,
3315 &num_watchers);
3316 if (ret)
3317 return ret;
3318
3319 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3320 for (i = 0; i < num_watchers; i++) {
3321 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3322 sizeof(locker->info.addr)) &&
3323 watchers[i].cookie == cookie) {
3324 struct rbd_client_id cid = {
3325 .gid = le64_to_cpu(watchers[i].name.num),
3326 .handle = cookie,
3327 };
3328
3329 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3330 rbd_dev, cid.gid, cid.handle);
3331 rbd_set_owner_cid(rbd_dev, &cid);
3332 ret = 1;
3333 goto out;
3334 }
3335 }
3336
3337 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3338 ret = 0;
3339out:
3340 kfree(watchers);
3341 return ret;
3342}
3343
3344/*
3345 * lock_rwsem must be held for write
3346 */
3347static int rbd_try_lock(struct rbd_device *rbd_dev)
3348{
3349 struct ceph_client *client = rbd_dev->rbd_client->client;
3350 struct ceph_locker *lockers;
3351 u32 num_lockers;
3352 int ret;
3353
3354 for (;;) {
3355 ret = rbd_lock(rbd_dev);
3356 if (ret != -EBUSY)
3357 return ret;
3358
3359 /* determine if the current lock holder is still alive */
3360 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3361 if (ret)
3362 return ret;
3363
3364 if (num_lockers == 0)
3365 goto again;
3366
3367 ret = find_watcher(rbd_dev, lockers);
3368 if (ret) {
3369 if (ret > 0)
3370 ret = 0; /* have to request lock */
3371 goto out;
3372 }
3373
3374 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3375 ENTITY_NAME(lockers[0].id.name));
3376
3377 ret = ceph_monc_blacklist_add(&client->monc,
3378 &lockers[0].info.addr);
3379 if (ret) {
3380 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3381 ENTITY_NAME(lockers[0].id.name), ret);
3382 goto out;
3383 }
3384
3385 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3386 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3387 lockers[0].id.cookie,
3388 &lockers[0].id.name);
3389 if (ret && ret != -ENOENT)
3390 goto out;
3391
3392again:
3393 ceph_free_lockers(lockers, num_lockers);
3394 }
3395
3396out:
3397 ceph_free_lockers(lockers, num_lockers);
3398 return ret;
3399}
3400
3401/*
3402 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3403 */
3404static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3405 int *pret)
3406{
3407 enum rbd_lock_state lock_state;
3408
3409 down_read(&rbd_dev->lock_rwsem);
3410 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3411 rbd_dev->lock_state);
3412 if (__rbd_is_lock_owner(rbd_dev)) {
3413 lock_state = rbd_dev->lock_state;
3414 up_read(&rbd_dev->lock_rwsem);
3415 return lock_state;
3416 }
3417
3418 up_read(&rbd_dev->lock_rwsem);
3419 down_write(&rbd_dev->lock_rwsem);
3420 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3421 rbd_dev->lock_state);
3422 if (!__rbd_is_lock_owner(rbd_dev)) {
3423 *pret = rbd_try_lock(rbd_dev);
3424 if (*pret)
3425 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3426 }
3427
3428 lock_state = rbd_dev->lock_state;
3429 up_write(&rbd_dev->lock_rwsem);
3430 return lock_state;
3431}
3432
3433static void rbd_acquire_lock(struct work_struct *work)
3434{
3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 struct rbd_device, lock_dwork);
3437 enum rbd_lock_state lock_state;
37f13252 3438 int ret = 0;
ed95b21a
ID
3439
3440 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441again:
3442 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3443 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3444 if (lock_state == RBD_LOCK_STATE_LOCKED)
3445 wake_requests(rbd_dev, true);
3446 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3447 rbd_dev, lock_state, ret);
3448 return;
3449 }
3450
3451 ret = rbd_request_lock(rbd_dev);
3452 if (ret == -ETIMEDOUT) {
3453 goto again; /* treat this as a dead client */
e010dd0a
ID
3454 } else if (ret == -EROFS) {
3455 rbd_warn(rbd_dev, "peer will not release lock");
3456 /*
3457 * If this is rbd_add_acquire_lock(), we want to fail
3458 * immediately -- reuse BLACKLISTED flag. Otherwise we
3459 * want to block.
3460 */
3461 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3462 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3463 /* wake "rbd map --exclusive" process */
3464 wake_requests(rbd_dev, false);
3465 }
ed95b21a
ID
3466 } else if (ret < 0) {
3467 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3468 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3469 RBD_RETRY_DELAY);
3470 } else {
3471 /*
3472 * lock owner acked, but resend if we don't see them
3473 * release the lock
3474 */
3475 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476 rbd_dev);
3477 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3478 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3479 }
3480}
3481
3482/*
3483 * lock_rwsem must be held for write
3484 */
3485static bool rbd_release_lock(struct rbd_device *rbd_dev)
3486{
3487 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3488 rbd_dev->lock_state);
3489 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3490 return false;
3491
3492 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3493 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3494 /*
ed95b21a 3495 * Ensure that all in-flight IO is flushed.
52bb1f9b 3496 *
ed95b21a
ID
3497 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3498 * may be shared with other devices.
52bb1f9b 3499 */
ed95b21a
ID
3500 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3501 up_read(&rbd_dev->lock_rwsem);
3502
3503 down_write(&rbd_dev->lock_rwsem);
3504 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3505 rbd_dev->lock_state);
3506 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3507 return false;
3508
bbead745
ID
3509 rbd_unlock(rbd_dev);
3510 /*
3511 * Give others a chance to grab the lock - we would re-acquire
3512 * almost immediately if we got new IO during ceph_osdc_sync()
3513 * otherwise. We need to ack our own notifications, so this
3514 * lock_dwork will be requeued from rbd_wait_state_locked()
3515 * after wake_requests() in rbd_handle_released_lock().
3516 */
3517 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3518 return true;
3519}
3520
3521static void rbd_release_lock_work(struct work_struct *work)
3522{
3523 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3524 unlock_work);
3525
3526 down_write(&rbd_dev->lock_rwsem);
3527 rbd_release_lock(rbd_dev);
3528 up_write(&rbd_dev->lock_rwsem);
3529}
3530
3531static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3532 void **p)
3533{
3534 struct rbd_client_id cid = { 0 };
3535
3536 if (struct_v >= 2) {
3537 cid.gid = ceph_decode_64(p);
3538 cid.handle = ceph_decode_64(p);
3539 }
3540
3541 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542 cid.handle);
3543 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3544 down_write(&rbd_dev->lock_rwsem);
3545 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3546 /*
3547 * we already know that the remote client is
3548 * the owner
3549 */
3550 up_write(&rbd_dev->lock_rwsem);
3551 return;
3552 }
3553
3554 rbd_set_owner_cid(rbd_dev, &cid);
3555 downgrade_write(&rbd_dev->lock_rwsem);
3556 } else {
3557 down_read(&rbd_dev->lock_rwsem);
3558 }
3559
3560 if (!__rbd_is_lock_owner(rbd_dev))
3561 wake_requests(rbd_dev, false);
3562 up_read(&rbd_dev->lock_rwsem);
3563}
3564
3565static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566 void **p)
3567{
3568 struct rbd_client_id cid = { 0 };
3569
3570 if (struct_v >= 2) {
3571 cid.gid = ceph_decode_64(p);
3572 cid.handle = ceph_decode_64(p);
3573 }
3574
3575 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576 cid.handle);
3577 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3578 down_write(&rbd_dev->lock_rwsem);
3579 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3580 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3581 __func__, rbd_dev, cid.gid, cid.handle,
3582 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3583 up_write(&rbd_dev->lock_rwsem);
3584 return;
3585 }
3586
3587 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3588 downgrade_write(&rbd_dev->lock_rwsem);
3589 } else {
3590 down_read(&rbd_dev->lock_rwsem);
3591 }
3592
3593 if (!__rbd_is_lock_owner(rbd_dev))
3594 wake_requests(rbd_dev, false);
3595 up_read(&rbd_dev->lock_rwsem);
3596}
3597
3b77faa0
ID
3598/*
3599 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3600 * ResponseMessage is needed.
3601 */
3602static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3603 void **p)
ed95b21a
ID
3604{
3605 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3606 struct rbd_client_id cid = { 0 };
3b77faa0 3607 int result = 1;
ed95b21a
ID
3608
3609 if (struct_v >= 2) {
3610 cid.gid = ceph_decode_64(p);
3611 cid.handle = ceph_decode_64(p);
3612 }
3613
3614 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615 cid.handle);
3616 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3617 return result;
ed95b21a
ID
3618
3619 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3620 if (__rbd_is_lock_owner(rbd_dev)) {
3621 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3622 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3623 goto out_unlock;
3624
3625 /*
3626 * encode ResponseMessage(0) so the peer can detect
3627 * a missing owner
3628 */
3629 result = 0;
3630
3631 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3632 if (!rbd_dev->opts->exclusive) {
3633 dout("%s rbd_dev %p queueing unlock_work\n",
3634 __func__, rbd_dev);
3635 queue_work(rbd_dev->task_wq,
3636 &rbd_dev->unlock_work);
3637 } else {
3638 /* refuse to release the lock */
3639 result = -EROFS;
3640 }
ed95b21a
ID
3641 }
3642 }
3b77faa0
ID
3643
3644out_unlock:
ed95b21a 3645 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3646 return result;
ed95b21a
ID
3647}
3648
3649static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3650 u64 notify_id, u64 cookie, s32 *result)
3651{
3652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3653 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3654 char buf[buf_size];
3655 int ret;
3656
3657 if (result) {
3658 void *p = buf;
3659
3660 /* encode ResponseMessage */
3661 ceph_start_encoding(&p, 1, 1,
3662 buf_size - CEPH_ENCODING_START_BLK_LEN);
3663 ceph_encode_32(&p, *result);
3664 } else {
3665 buf_size = 0;
3666 }
b8d70035 3667
922dab61
ID
3668 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3669 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3670 buf, buf_size);
52bb1f9b 3671 if (ret)
ed95b21a
ID
3672 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3673}
3674
3675static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3676 u64 cookie)
3677{
3678 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3679 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3680}
3681
3682static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3683 u64 notify_id, u64 cookie, s32 result)
3684{
3685 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3686 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3687}
3688
3689static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 u64 notifier_id, void *data, size_t data_len)
3691{
3692 struct rbd_device *rbd_dev = arg;
3693 void *p = data;
3694 void *const end = p + data_len;
d4c2269b 3695 u8 struct_v = 0;
ed95b21a
ID
3696 u32 len;
3697 u32 notify_op;
3698 int ret;
3699
3700 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3701 __func__, rbd_dev, cookie, notify_id, data_len);
3702 if (data_len) {
3703 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3704 &struct_v, &len);
3705 if (ret) {
3706 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3707 ret);
3708 return;
3709 }
3710
3711 notify_op = ceph_decode_32(&p);
3712 } else {
3713 /* legacy notification for header updates */
3714 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3715 len = 0;
3716 }
3717
3718 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3719 switch (notify_op) {
3720 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3721 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 break;
3724 case RBD_NOTIFY_OP_RELEASED_LOCK:
3725 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3726 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727 break;
3728 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3729 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730 if (ret <= 0)
ed95b21a 3731 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3732 cookie, ret);
ed95b21a
ID
3733 else
3734 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735 break;
3736 case RBD_NOTIFY_OP_HEADER_UPDATE:
3737 ret = rbd_dev_refresh(rbd_dev);
3738 if (ret)
3739 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3740
3741 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3742 break;
3743 default:
3744 if (rbd_is_lock_owner(rbd_dev))
3745 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3746 cookie, -EOPNOTSUPP);
3747 else
3748 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3749 break;
3750 }
b8d70035
AE
3751}
3752
99d16943
ID
3753static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3754
922dab61 3755static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3756{
922dab61 3757 struct rbd_device *rbd_dev = arg;
bb040aa0 3758
922dab61 3759 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3760
ed95b21a
ID
3761 down_write(&rbd_dev->lock_rwsem);
3762 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3763 up_write(&rbd_dev->lock_rwsem);
3764
99d16943
ID
3765 mutex_lock(&rbd_dev->watch_mutex);
3766 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3767 __rbd_unregister_watch(rbd_dev);
3768 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3769
99d16943 3770 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3771 }
99d16943 3772 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3773}
3774
9969ebc5 3775/*
99d16943 3776 * watch_mutex must be locked
9969ebc5 3777 */
99d16943 3778static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3779{
3780 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3781 struct ceph_osd_linger_request *handle;
9969ebc5 3782
922dab61 3783 rbd_assert(!rbd_dev->watch_handle);
99d16943 3784 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3785
922dab61
ID
3786 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3787 &rbd_dev->header_oloc, rbd_watch_cb,
3788 rbd_watch_errcb, rbd_dev);
3789 if (IS_ERR(handle))
3790 return PTR_ERR(handle);
8eb87565 3791
922dab61 3792 rbd_dev->watch_handle = handle;
b30a01f2 3793 return 0;
b30a01f2
ID
3794}
3795
99d16943
ID
3796/*
3797 * watch_mutex must be locked
3798 */
3799static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3800{
922dab61
ID
3801 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802 int ret;
b30a01f2 3803
99d16943
ID
3804 rbd_assert(rbd_dev->watch_handle);
3805 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3806
922dab61
ID
3807 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808 if (ret)
3809 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3810
922dab61 3811 rbd_dev->watch_handle = NULL;
c525f036
ID
3812}
3813
99d16943
ID
3814static int rbd_register_watch(struct rbd_device *rbd_dev)
3815{
3816 int ret;
3817
3818 mutex_lock(&rbd_dev->watch_mutex);
3819 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3820 ret = __rbd_register_watch(rbd_dev);
3821 if (ret)
3822 goto out;
3823
3824 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3825 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3826
3827out:
3828 mutex_unlock(&rbd_dev->watch_mutex);
3829 return ret;
3830}
3831
3832static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3833{
99d16943
ID
3834 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3835
3836 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3837 cancel_work_sync(&rbd_dev->acquired_lock_work);
3838 cancel_work_sync(&rbd_dev->released_lock_work);
3839 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3840 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3841}
3842
3843static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3844{
ed95b21a 3845 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3846 cancel_tasks_sync(rbd_dev);
3847
3848 mutex_lock(&rbd_dev->watch_mutex);
3849 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3850 __rbd_unregister_watch(rbd_dev);
3851 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3852 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3853
811c6688 3854 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3855}
3856
14bb211d
ID
3857/*
3858 * lock_rwsem must be held for write
3859 */
3860static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3861{
3862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863 char cookie[32];
3864 int ret;
3865
3866 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3867
3868 format_lock_cookie(rbd_dev, cookie);
3869 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3870 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3871 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3872 RBD_LOCK_TAG, cookie);
3873 if (ret) {
3874 if (ret != -EOPNOTSUPP)
3875 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3876 ret);
3877
3878 /*
3879 * Lock cookie cannot be updated on older OSDs, so do
3880 * a manual release and queue an acquire.
3881 */
3882 if (rbd_release_lock(rbd_dev))
3883 queue_delayed_work(rbd_dev->task_wq,
3884 &rbd_dev->lock_dwork, 0);
3885 } else {
3886 strcpy(rbd_dev->lock_cookie, cookie);
3887 }
3888}
3889
99d16943
ID
3890static void rbd_reregister_watch(struct work_struct *work)
3891{
3892 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3893 struct rbd_device, watch_dwork);
3894 int ret;
3895
3896 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3897
3898 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3899 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3900 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3901 return;
87c0fded 3902 }
99d16943
ID
3903
3904 ret = __rbd_register_watch(rbd_dev);
3905 if (ret) {
3906 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3907 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3908 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3909 wake_requests(rbd_dev, true);
87c0fded 3910 } else {
99d16943
ID
3911 queue_delayed_work(rbd_dev->task_wq,
3912 &rbd_dev->watch_dwork,
3913 RBD_RETRY_DELAY);
87c0fded
ID
3914 }
3915 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3916 return;
99d16943
ID
3917 }
3918
3919 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3920 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3921 mutex_unlock(&rbd_dev->watch_mutex);
3922
14bb211d
ID
3923 down_write(&rbd_dev->lock_rwsem);
3924 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3925 rbd_reacquire_lock(rbd_dev);
3926 up_write(&rbd_dev->lock_rwsem);
3927
99d16943
ID
3928 ret = rbd_dev_refresh(rbd_dev);
3929 if (ret)
3930 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
99d16943
ID
3931}
3932
36be9a76 3933/*
f40eb349
AE
3934 * Synchronous osd object method call. Returns the number of bytes
3935 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3936 */
3937static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3938 struct ceph_object_id *oid,
3939 struct ceph_object_locator *oloc,
36be9a76 3940 const char *method_name,
4157976b 3941 const void *outbound,
36be9a76 3942 size_t outbound_size,
4157976b 3943 void *inbound,
e2a58ee5 3944 size_t inbound_size)
36be9a76 3945{
ecd4a68a
ID
3946 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3947 struct page *req_page = NULL;
3948 struct page *reply_page;
36be9a76
AE
3949 int ret;
3950
3951 /*
6010a451
AE
3952 * Method calls are ultimately read operations. The result
3953 * should placed into the inbound buffer provided. They
3954 * also supply outbound data--parameters for the object
3955 * method. Currently if this is present it will be a
3956 * snapshot id.
36be9a76 3957 */
ecd4a68a
ID
3958 if (outbound) {
3959 if (outbound_size > PAGE_SIZE)
3960 return -E2BIG;
36be9a76 3961
ecd4a68a
ID
3962 req_page = alloc_page(GFP_KERNEL);
3963 if (!req_page)
3964 return -ENOMEM;
04017e29 3965
ecd4a68a 3966 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3967 }
36be9a76 3968
ecd4a68a
ID
3969 reply_page = alloc_page(GFP_KERNEL);
3970 if (!reply_page) {
3971 if (req_page)
3972 __free_page(req_page);
3973 return -ENOMEM;
3974 }
57385b51 3975
ecd4a68a
ID
3976 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3977 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3978 reply_page, &inbound_size);
3979 if (!ret) {
3980 memcpy(inbound, page_address(reply_page), inbound_size);
3981 ret = inbound_size;
3982 }
36be9a76 3983
ecd4a68a
ID
3984 if (req_page)
3985 __free_page(req_page);
3986 __free_page(reply_page);
36be9a76
AE
3987 return ret;
3988}
3989
ed95b21a
ID
3990/*
3991 * lock_rwsem must be held for read
3992 */
3993static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3994{
3995 DEFINE_WAIT(wait);
3996
3997 do {
3998 /*
3999 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4000 * and cancel_delayed_work() in wake_requests().
4001 */
4002 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4003 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4004 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4005 TASK_UNINTERRUPTIBLE);
4006 up_read(&rbd_dev->lock_rwsem);
4007 schedule();
4008 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
4009 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4010 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4011
ed95b21a
ID
4012 finish_wait(&rbd_dev->lock_waitq, &wait);
4013}
4014
7ad18afa 4015static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4016{
7ad18afa
CH
4017 struct request *rq = blk_mq_rq_from_pdu(work);
4018 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4019 struct rbd_img_request *img_request;
4e752f0a 4020 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4021 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4022 u64 length = blk_rq_bytes(rq);
6d2940c8 4023 enum obj_operation_type op_type;
4e752f0a 4024 u64 mapping_size;
80de1912 4025 bool must_be_locked;
bf0d5f50
AE
4026 int result;
4027
aebf526b
CH
4028 switch (req_op(rq)) {
4029 case REQ_OP_DISCARD:
6ac56951 4030 case REQ_OP_WRITE_ZEROES:
90e98c52 4031 op_type = OBJ_OP_DISCARD;
aebf526b
CH
4032 break;
4033 case REQ_OP_WRITE:
6d2940c8 4034 op_type = OBJ_OP_WRITE;
aebf526b
CH
4035 break;
4036 case REQ_OP_READ:
6d2940c8 4037 op_type = OBJ_OP_READ;
aebf526b
CH
4038 break;
4039 default:
4040 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4041 result = -EIO;
4042 goto err;
4043 }
6d2940c8 4044
bc1ecc65 4045 /* Ignore/skip any zero-length requests */
bf0d5f50 4046
bc1ecc65
ID
4047 if (!length) {
4048 dout("%s: zero-length request\n", __func__);
4049 result = 0;
4050 goto err_rq;
4051 }
bf0d5f50 4052
6d2940c8 4053 /* Only reads are allowed to a read-only device */
bc1ecc65 4054
6d2940c8 4055 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4056 if (rbd_dev->mapping.read_only) {
4057 result = -EROFS;
4058 goto err_rq;
4dda41d3 4059 }
bc1ecc65
ID
4060 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4061 }
4dda41d3 4062
bc1ecc65
ID
4063 /*
4064 * Quit early if the mapped snapshot no longer exists. It's
4065 * still possible the snapshot will have disappeared by the
4066 * time our request arrives at the osd, but there's no sense in
4067 * sending it if we already know.
4068 */
4069 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4070 dout("request for non-existent snapshot");
4071 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4072 result = -ENXIO;
4073 goto err_rq;
4074 }
4dda41d3 4075
bc1ecc65
ID
4076 if (offset && length > U64_MAX - offset + 1) {
4077 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4078 length);
4079 result = -EINVAL;
4080 goto err_rq; /* Shouldn't happen */
4081 }
4dda41d3 4082
7ad18afa
CH
4083 blk_mq_start_request(rq);
4084
4e752f0a
JD
4085 down_read(&rbd_dev->header_rwsem);
4086 mapping_size = rbd_dev->mapping.size;
6d2940c8 4087 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4088 snapc = rbd_dev->header.snapc;
4089 ceph_get_snap_context(snapc);
4090 }
4091 up_read(&rbd_dev->header_rwsem);
4092
4093 if (offset + length > mapping_size) {
bc1ecc65 4094 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4095 length, mapping_size);
bc1ecc65
ID
4096 result = -EIO;
4097 goto err_rq;
4098 }
bf0d5f50 4099
f9bebd58
ID
4100 must_be_locked =
4101 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4102 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
4103 if (must_be_locked) {
4104 down_read(&rbd_dev->lock_rwsem);
87c0fded 4105 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
e010dd0a
ID
4106 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4107 if (rbd_dev->opts->exclusive) {
4108 rbd_warn(rbd_dev, "exclusive lock required");
4109 result = -EROFS;
4110 goto err_unlock;
4111 }
ed95b21a 4112 rbd_wait_state_locked(rbd_dev);
e010dd0a 4113 }
87c0fded
ID
4114 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4115 result = -EBLACKLISTED;
4116 goto err_unlock;
4117 }
ed95b21a
ID
4118 }
4119
6d2940c8 4120 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4121 snapc);
bc1ecc65
ID
4122 if (!img_request) {
4123 result = -ENOMEM;
ed95b21a 4124 goto err_unlock;
bc1ecc65
ID
4125 }
4126 img_request->rq = rq;
70b16db8 4127 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4128
90e98c52
GZ
4129 if (op_type == OBJ_OP_DISCARD)
4130 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4131 NULL);
4132 else
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4134 rq->bio);
bc1ecc65
ID
4135 if (result)
4136 goto err_img_request;
bf0d5f50 4137
bc1ecc65
ID
4138 result = rbd_img_request_submit(img_request);
4139 if (result)
4140 goto err_img_request;
bf0d5f50 4141
ed95b21a
ID
4142 if (must_be_locked)
4143 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4144 return;
bf0d5f50 4145
bc1ecc65
ID
4146err_img_request:
4147 rbd_img_request_put(img_request);
ed95b21a
ID
4148err_unlock:
4149 if (must_be_locked)
4150 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4151err_rq:
4152 if (result)
4153 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4154 obj_op_name(op_type), length, offset, result);
e96a650a 4155 ceph_put_snap_context(snapc);
7ad18afa 4156err:
2a842aca 4157 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 4158}
bf0d5f50 4159
fc17b653 4160static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 4161 const struct blk_mq_queue_data *bd)
bc1ecc65 4162{
7ad18afa
CH
4163 struct request *rq = bd->rq;
4164 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4165
7ad18afa 4166 queue_work(rbd_wq, work);
fc17b653 4167 return BLK_STS_OK;
bf0d5f50
AE
4168}
4169
602adf40
YS
4170static void rbd_free_disk(struct rbd_device *rbd_dev)
4171{
5769ed0c
ID
4172 blk_cleanup_queue(rbd_dev->disk->queue);
4173 blk_mq_free_tag_set(&rbd_dev->tag_set);
4174 put_disk(rbd_dev->disk);
a0cab924 4175 rbd_dev->disk = NULL;
602adf40
YS
4176}
4177
788e2df3 4178static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
4179 struct ceph_object_id *oid,
4180 struct ceph_object_locator *oloc,
4181 void *buf, int buf_len)
788e2df3
AE
4182
4183{
fe5478e0
ID
4184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4185 struct ceph_osd_request *req;
4186 struct page **pages;
4187 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
4188 int ret;
4189
fe5478e0
ID
4190 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4191 if (!req)
4192 return -ENOMEM;
788e2df3 4193
fe5478e0
ID
4194 ceph_oid_copy(&req->r_base_oid, oid);
4195 ceph_oloc_copy(&req->r_base_oloc, oloc);
4196 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 4197
fe5478e0 4198 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 4199 if (ret)
fe5478e0 4200 goto out_req;
788e2df3 4201
fe5478e0
ID
4202 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4203 if (IS_ERR(pages)) {
4204 ret = PTR_ERR(pages);
4205 goto out_req;
4206 }
1ceae7ef 4207
fe5478e0
ID
4208 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4209 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4210 true);
4211
4212 ceph_osdc_start_request(osdc, req, false);
4213 ret = ceph_osdc_wait_request(osdc, req);
4214 if (ret >= 0)
4215 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 4216
fe5478e0
ID
4217out_req:
4218 ceph_osdc_put_request(req);
788e2df3
AE
4219 return ret;
4220}
4221
602adf40 4222/*
662518b1
AE
4223 * Read the complete header for the given rbd device. On successful
4224 * return, the rbd_dev->header field will contain up-to-date
4225 * information about the image.
602adf40 4226 */
99a41ebc 4227static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4228{
4156d998 4229 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4230 u32 snap_count = 0;
4156d998
AE
4231 u64 names_size = 0;
4232 u32 want_count;
4233 int ret;
602adf40 4234
00f1f36f 4235 /*
4156d998
AE
4236 * The complete header will include an array of its 64-bit
4237 * snapshot ids, followed by the names of those snapshots as
4238 * a contiguous block of NUL-terminated strings. Note that
4239 * the number of snapshots could change by the time we read
4240 * it in, in which case we re-read it.
00f1f36f 4241 */
4156d998
AE
4242 do {
4243 size_t size;
4244
4245 kfree(ondisk);
4246
4247 size = sizeof (*ondisk);
4248 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4249 size += names_size;
4250 ondisk = kmalloc(size, GFP_KERNEL);
4251 if (!ondisk)
662518b1 4252 return -ENOMEM;
4156d998 4253
fe5478e0
ID
4254 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4255 &rbd_dev->header_oloc, ondisk, size);
4156d998 4256 if (ret < 0)
662518b1 4257 goto out;
c0cd10db 4258 if ((size_t)ret < size) {
4156d998 4259 ret = -ENXIO;
06ecc6cb
AE
4260 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4261 size, ret);
662518b1 4262 goto out;
4156d998
AE
4263 }
4264 if (!rbd_dev_ondisk_valid(ondisk)) {
4265 ret = -ENXIO;
06ecc6cb 4266 rbd_warn(rbd_dev, "invalid header");
662518b1 4267 goto out;
81e759fb 4268 }
602adf40 4269
4156d998
AE
4270 names_size = le64_to_cpu(ondisk->snap_names_len);
4271 want_count = snap_count;
4272 snap_count = le32_to_cpu(ondisk->snap_count);
4273 } while (snap_count != want_count);
00f1f36f 4274
662518b1
AE
4275 ret = rbd_header_from_disk(rbd_dev, ondisk);
4276out:
4156d998
AE
4277 kfree(ondisk);
4278
4279 return ret;
602adf40
YS
4280}
4281
15228ede
AE
4282/*
4283 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4284 * has disappeared from the (just updated) snapshot context.
4285 */
4286static void rbd_exists_validate(struct rbd_device *rbd_dev)
4287{
4288 u64 snap_id;
4289
4290 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4291 return;
4292
4293 snap_id = rbd_dev->spec->snap_id;
4294 if (snap_id == CEPH_NOSNAP)
4295 return;
4296
4297 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4298 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4299}
4300
9875201e
JD
4301static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4302{
4303 sector_t size;
9875201e
JD
4304
4305 /*
811c6688
ID
4306 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4307 * try to update its size. If REMOVING is set, updating size
4308 * is just useless work since the device can't be opened.
9875201e 4309 */
811c6688
ID
4310 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4311 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4312 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4313 dout("setting size to %llu sectors", (unsigned long long)size);
4314 set_capacity(rbd_dev->disk, size);
4315 revalidate_disk(rbd_dev->disk);
4316 }
4317}
4318
cc4a38bd 4319static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4320{
e627db08 4321 u64 mapping_size;
1fe5e993
AE
4322 int ret;
4323
cfbf6377 4324 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4325 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4326
4327 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4328 if (ret)
73e39e4d 4329 goto out;
15228ede 4330
e8f59b59
ID
4331 /*
4332 * If there is a parent, see if it has disappeared due to the
4333 * mapped image getting flattened.
4334 */
4335 if (rbd_dev->parent) {
4336 ret = rbd_dev_v2_parent_info(rbd_dev);
4337 if (ret)
73e39e4d 4338 goto out;
e8f59b59
ID
4339 }
4340
5ff1108c 4341 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4342 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4343 } else {
4344 /* validate mapped snapshot's EXISTS flag */
4345 rbd_exists_validate(rbd_dev);
4346 }
15228ede 4347
73e39e4d 4348out:
cfbf6377 4349 up_write(&rbd_dev->header_rwsem);
73e39e4d 4350 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4351 rbd_dev_update_size(rbd_dev);
1fe5e993 4352
73e39e4d 4353 return ret;
1fe5e993
AE
4354}
4355
d6296d39
CH
4356static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4357 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
4358{
4359 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4360
4361 INIT_WORK(work, rbd_queue_workfn);
4362 return 0;
4363}
4364
f363b089 4365static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 4366 .queue_rq = rbd_queue_rq,
7ad18afa
CH
4367 .init_request = rbd_init_request,
4368};
4369
602adf40
YS
4370static int rbd_init_disk(struct rbd_device *rbd_dev)
4371{
4372 struct gendisk *disk;
4373 struct request_queue *q;
593a9e7b 4374 u64 segment_size;
7ad18afa 4375 int err;
602adf40 4376
602adf40 4377 /* create gendisk info */
7e513d43
ID
4378 disk = alloc_disk(single_major ?
4379 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4380 RBD_MINORS_PER_MAJOR);
602adf40 4381 if (!disk)
1fcdb8aa 4382 return -ENOMEM;
602adf40 4383
f0f8cef5 4384 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4385 rbd_dev->dev_id);
602adf40 4386 disk->major = rbd_dev->major;
dd82fff1 4387 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4388 if (single_major)
4389 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4390 disk->fops = &rbd_bd_ops;
4391 disk->private_data = rbd_dev;
4392
7ad18afa
CH
4393 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4394 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4395 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4396 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4397 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4398 rbd_dev->tag_set.nr_hw_queues = 1;
4399 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4400
4401 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4402 if (err)
602adf40 4403 goto out_disk;
029bcbd8 4404
7ad18afa
CH
4405 q = blk_mq_init_queue(&rbd_dev->tag_set);
4406 if (IS_ERR(q)) {
4407 err = PTR_ERR(q);
4408 goto out_tag_set;
4409 }
4410
d8a2c89c
ID
4411 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4412 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4413
029bcbd8 4414 /* set io sizes to object size */
593a9e7b
AE
4415 segment_size = rbd_obj_bytes(&rbd_dev->header);
4416 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4417 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4418 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4419 blk_queue_max_segment_size(q, segment_size);
4420 blk_queue_io_min(q, segment_size);
4421 blk_queue_io_opt(q, segment_size);
029bcbd8 4422
90e98c52
GZ
4423 /* enable the discard support */
4424 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4425 q->limits.discard_granularity = segment_size;
4426 q->limits.discard_alignment = segment_size;
2bb4cd5c 4427 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 4428 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 4429
bae818ee 4430 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 4431 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 4432
5769ed0c
ID
4433 /*
4434 * disk_release() expects a queue ref from add_disk() and will
4435 * put it. Hold an extra ref until add_disk() is called.
4436 */
4437 WARN_ON(!blk_get_queue(q));
602adf40 4438 disk->queue = q;
602adf40
YS
4439 q->queuedata = rbd_dev;
4440
4441 rbd_dev->disk = disk;
602adf40 4442
602adf40 4443 return 0;
7ad18afa
CH
4444out_tag_set:
4445 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4446out_disk:
4447 put_disk(disk);
7ad18afa 4448 return err;
602adf40
YS
4449}
4450
dfc5606d
YS
4451/*
4452 sysfs
4453*/
4454
593a9e7b
AE
4455static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4456{
4457 return container_of(dev, struct rbd_device, dev);
4458}
4459
dfc5606d
YS
4460static ssize_t rbd_size_show(struct device *dev,
4461 struct device_attribute *attr, char *buf)
4462{
593a9e7b 4463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4464
fc71d833
AE
4465 return sprintf(buf, "%llu\n",
4466 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4467}
4468
34b13184
AE
4469/*
4470 * Note this shows the features for whatever's mapped, which is not
4471 * necessarily the base image.
4472 */
4473static ssize_t rbd_features_show(struct device *dev,
4474 struct device_attribute *attr, char *buf)
4475{
4476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4477
4478 return sprintf(buf, "0x%016llx\n",
fc71d833 4479 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4480}
4481
dfc5606d
YS
4482static ssize_t rbd_major_show(struct device *dev,
4483 struct device_attribute *attr, char *buf)
4484{
593a9e7b 4485 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4486
fc71d833
AE
4487 if (rbd_dev->major)
4488 return sprintf(buf, "%d\n", rbd_dev->major);
4489
4490 return sprintf(buf, "(none)\n");
dd82fff1
ID
4491}
4492
4493static ssize_t rbd_minor_show(struct device *dev,
4494 struct device_attribute *attr, char *buf)
4495{
4496 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4497
dd82fff1 4498 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4499}
4500
005a07bf
ID
4501static ssize_t rbd_client_addr_show(struct device *dev,
4502 struct device_attribute *attr, char *buf)
4503{
4504 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4505 struct ceph_entity_addr *client_addr =
4506 ceph_client_addr(rbd_dev->rbd_client->client);
4507
4508 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4509 le32_to_cpu(client_addr->nonce));
4510}
4511
dfc5606d
YS
4512static ssize_t rbd_client_id_show(struct device *dev,
4513 struct device_attribute *attr, char *buf)
602adf40 4514{
593a9e7b 4515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4516
1dbb4399 4517 return sprintf(buf, "client%lld\n",
033268a5 4518 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4519}
4520
267fb90b
MC
4521static ssize_t rbd_cluster_fsid_show(struct device *dev,
4522 struct device_attribute *attr, char *buf)
4523{
4524 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4525
4526 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4527}
4528
0d6d1e9c
MC
4529static ssize_t rbd_config_info_show(struct device *dev,
4530 struct device_attribute *attr, char *buf)
4531{
4532 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4533
4534 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4535}
4536
dfc5606d
YS
4537static ssize_t rbd_pool_show(struct device *dev,
4538 struct device_attribute *attr, char *buf)
602adf40 4539{
593a9e7b 4540 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4541
0d7dbfce 4542 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4543}
4544
9bb2f334
AE
4545static ssize_t rbd_pool_id_show(struct device *dev,
4546 struct device_attribute *attr, char *buf)
4547{
4548 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4549
0d7dbfce 4550 return sprintf(buf, "%llu\n",
fc71d833 4551 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4552}
4553
dfc5606d
YS
4554static ssize_t rbd_name_show(struct device *dev,
4555 struct device_attribute *attr, char *buf)
4556{
593a9e7b 4557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4558
a92ffdf8
AE
4559 if (rbd_dev->spec->image_name)
4560 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4561
4562 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4563}
4564
589d30e0
AE
4565static ssize_t rbd_image_id_show(struct device *dev,
4566 struct device_attribute *attr, char *buf)
4567{
4568 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4569
0d7dbfce 4570 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4571}
4572
34b13184
AE
4573/*
4574 * Shows the name of the currently-mapped snapshot (or
4575 * RBD_SNAP_HEAD_NAME for the base image).
4576 */
dfc5606d
YS
4577static ssize_t rbd_snap_show(struct device *dev,
4578 struct device_attribute *attr,
4579 char *buf)
4580{
593a9e7b 4581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4582
0d7dbfce 4583 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4584}
4585
92a58671
MC
4586static ssize_t rbd_snap_id_show(struct device *dev,
4587 struct device_attribute *attr, char *buf)
4588{
4589 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4590
4591 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4592}
4593
86b00e0d 4594/*
ff96128f
ID
4595 * For a v2 image, shows the chain of parent images, separated by empty
4596 * lines. For v1 images or if there is no parent, shows "(no parent
4597 * image)".
86b00e0d
AE
4598 */
4599static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4600 struct device_attribute *attr,
4601 char *buf)
86b00e0d
AE
4602{
4603 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4604 ssize_t count = 0;
86b00e0d 4605
ff96128f 4606 if (!rbd_dev->parent)
86b00e0d
AE
4607 return sprintf(buf, "(no parent image)\n");
4608
ff96128f
ID
4609 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4610 struct rbd_spec *spec = rbd_dev->parent_spec;
4611
4612 count += sprintf(&buf[count], "%s"
4613 "pool_id %llu\npool_name %s\n"
4614 "image_id %s\nimage_name %s\n"
4615 "snap_id %llu\nsnap_name %s\n"
4616 "overlap %llu\n",
4617 !count ? "" : "\n", /* first? */
4618 spec->pool_id, spec->pool_name,
4619 spec->image_id, spec->image_name ?: "(unknown)",
4620 spec->snap_id, spec->snap_name,
4621 rbd_dev->parent_overlap);
4622 }
4623
4624 return count;
86b00e0d
AE
4625}
4626
dfc5606d
YS
4627static ssize_t rbd_image_refresh(struct device *dev,
4628 struct device_attribute *attr,
4629 const char *buf,
4630 size_t size)
4631{
593a9e7b 4632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4633 int ret;
602adf40 4634
cc4a38bd 4635 ret = rbd_dev_refresh(rbd_dev);
e627db08 4636 if (ret)
52bb1f9b 4637 return ret;
b813623a 4638
52bb1f9b 4639 return size;
dfc5606d 4640}
602adf40 4641
dfc5606d 4642static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4643static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4644static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4645static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4646static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4647static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4648static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4649static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4650static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4651static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4652static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4653static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4654static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4655static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4656static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4657static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4658
4659static struct attribute *rbd_attrs[] = {
4660 &dev_attr_size.attr,
34b13184 4661 &dev_attr_features.attr,
dfc5606d 4662 &dev_attr_major.attr,
dd82fff1 4663 &dev_attr_minor.attr,
005a07bf 4664 &dev_attr_client_addr.attr,
dfc5606d 4665 &dev_attr_client_id.attr,
267fb90b 4666 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4667 &dev_attr_config_info.attr,
dfc5606d 4668 &dev_attr_pool.attr,
9bb2f334 4669 &dev_attr_pool_id.attr,
dfc5606d 4670 &dev_attr_name.attr,
589d30e0 4671 &dev_attr_image_id.attr,
dfc5606d 4672 &dev_attr_current_snap.attr,
92a58671 4673 &dev_attr_snap_id.attr,
86b00e0d 4674 &dev_attr_parent.attr,
dfc5606d 4675 &dev_attr_refresh.attr,
dfc5606d
YS
4676 NULL
4677};
4678
4679static struct attribute_group rbd_attr_group = {
4680 .attrs = rbd_attrs,
4681};
4682
4683static const struct attribute_group *rbd_attr_groups[] = {
4684 &rbd_attr_group,
4685 NULL
4686};
4687
6cac4695 4688static void rbd_dev_release(struct device *dev);
dfc5606d 4689
b9942bc9 4690static const struct device_type rbd_device_type = {
dfc5606d
YS
4691 .name = "rbd",
4692 .groups = rbd_attr_groups,
6cac4695 4693 .release = rbd_dev_release,
dfc5606d
YS
4694};
4695
8b8fb99c
AE
4696static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4697{
4698 kref_get(&spec->kref);
4699
4700 return spec;
4701}
4702
4703static void rbd_spec_free(struct kref *kref);
4704static void rbd_spec_put(struct rbd_spec *spec)
4705{
4706 if (spec)
4707 kref_put(&spec->kref, rbd_spec_free);
4708}
4709
4710static struct rbd_spec *rbd_spec_alloc(void)
4711{
4712 struct rbd_spec *spec;
4713
4714 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4715 if (!spec)
4716 return NULL;
04077599
ID
4717
4718 spec->pool_id = CEPH_NOPOOL;
4719 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4720 kref_init(&spec->kref);
4721
8b8fb99c
AE
4722 return spec;
4723}
4724
4725static void rbd_spec_free(struct kref *kref)
4726{
4727 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4728
4729 kfree(spec->pool_name);
4730 kfree(spec->image_id);
4731 kfree(spec->image_name);
4732 kfree(spec->snap_name);
4733 kfree(spec);
4734}
4735
1643dfa4 4736static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4737{
99d16943 4738 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4739 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4740
c41d13a3 4741 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4742 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4743 kfree(rbd_dev->config_info);
c41d13a3 4744
dd5ac32d
ID
4745 rbd_put_client(rbd_dev->rbd_client);
4746 rbd_spec_put(rbd_dev->spec);
4747 kfree(rbd_dev->opts);
4748 kfree(rbd_dev);
1643dfa4
ID
4749}
4750
4751static void rbd_dev_release(struct device *dev)
4752{
4753 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4754 bool need_put = !!rbd_dev->opts;
4755
4756 if (need_put) {
4757 destroy_workqueue(rbd_dev->task_wq);
4758 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4759 }
4760
4761 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4762
4763 /*
4764 * This is racy, but way better than putting module outside of
4765 * the release callback. The race window is pretty small, so
4766 * doing something similar to dm (dm-builtin.c) is overkill.
4767 */
4768 if (need_put)
4769 module_put(THIS_MODULE);
4770}
4771
1643dfa4
ID
4772static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4773 struct rbd_spec *spec)
c53d5893
AE
4774{
4775 struct rbd_device *rbd_dev;
4776
1643dfa4 4777 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4778 if (!rbd_dev)
4779 return NULL;
4780
4781 spin_lock_init(&rbd_dev->lock);
4782 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4783 init_rwsem(&rbd_dev->header_rwsem);
4784
7e97332e 4785 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4786 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4787 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4788
99d16943
ID
4789 mutex_init(&rbd_dev->watch_mutex);
4790 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4791 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4792
ed95b21a
ID
4793 init_rwsem(&rbd_dev->lock_rwsem);
4794 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4795 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4796 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4797 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4798 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4799 init_waitqueue_head(&rbd_dev->lock_waitq);
4800
dd5ac32d
ID
4801 rbd_dev->dev.bus = &rbd_bus_type;
4802 rbd_dev->dev.type = &rbd_device_type;
4803 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4804 device_initialize(&rbd_dev->dev);
4805
c53d5893 4806 rbd_dev->rbd_client = rbdc;
d147543d 4807 rbd_dev->spec = spec;
0903e875 4808
1643dfa4
ID
4809 return rbd_dev;
4810}
4811
4812/*
4813 * Create a mapping rbd_dev.
4814 */
4815static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4816 struct rbd_spec *spec,
4817 struct rbd_options *opts)
4818{
4819 struct rbd_device *rbd_dev;
4820
4821 rbd_dev = __rbd_dev_create(rbdc, spec);
4822 if (!rbd_dev)
4823 return NULL;
4824
4825 rbd_dev->opts = opts;
4826
4827 /* get an id and fill in device name */
4828 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4829 minor_to_rbd_dev_id(1 << MINORBITS),
4830 GFP_KERNEL);
4831 if (rbd_dev->dev_id < 0)
4832 goto fail_rbd_dev;
4833
4834 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4835 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4836 rbd_dev->name);
4837 if (!rbd_dev->task_wq)
4838 goto fail_dev_id;
dd5ac32d 4839
1643dfa4
ID
4840 /* we have a ref from do_rbd_add() */
4841 __module_get(THIS_MODULE);
dd5ac32d 4842
1643dfa4 4843 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4844 return rbd_dev;
1643dfa4
ID
4845
4846fail_dev_id:
4847 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848fail_rbd_dev:
4849 rbd_dev_free(rbd_dev);
4850 return NULL;
c53d5893
AE
4851}
4852
4853static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4854{
dd5ac32d
ID
4855 if (rbd_dev)
4856 put_device(&rbd_dev->dev);
c53d5893
AE
4857}
4858
9d475de5
AE
4859/*
4860 * Get the size and object order for an image snapshot, or if
4861 * snap_id is CEPH_NOSNAP, gets this information for the base
4862 * image.
4863 */
4864static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4865 u8 *order, u64 *snap_size)
4866{
4867 __le64 snapid = cpu_to_le64(snap_id);
4868 int ret;
4869 struct {
4870 u8 order;
4871 __le64 size;
4872 } __attribute__ ((packed)) size_buf = { 0 };
4873
ecd4a68a
ID
4874 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4875 &rbd_dev->header_oloc, "get_size",
4876 &snapid, sizeof(snapid),
4877 &size_buf, sizeof(size_buf));
36be9a76 4878 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4879 if (ret < 0)
4880 return ret;
57385b51
AE
4881 if (ret < sizeof (size_buf))
4882 return -ERANGE;
9d475de5 4883
c3545579 4884 if (order) {
c86f86e9 4885 *order = size_buf.order;
c3545579
JD
4886 dout(" order %u", (unsigned int)*order);
4887 }
9d475de5
AE
4888 *snap_size = le64_to_cpu(size_buf.size);
4889
c3545579
JD
4890 dout(" snap_id 0x%016llx snap_size = %llu\n",
4891 (unsigned long long)snap_id,
57385b51 4892 (unsigned long long)*snap_size);
9d475de5
AE
4893
4894 return 0;
4895}
4896
4897static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4898{
4899 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4900 &rbd_dev->header.obj_order,
4901 &rbd_dev->header.image_size);
4902}
4903
1e130199
AE
4904static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4905{
4906 void *reply_buf;
4907 int ret;
4908 void *p;
4909
4910 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4911 if (!reply_buf)
4912 return -ENOMEM;
4913
ecd4a68a
ID
4914 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4915 &rbd_dev->header_oloc, "get_object_prefix",
4916 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4918 if (ret < 0)
4919 goto out;
4920
4921 p = reply_buf;
4922 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4923 p + ret, NULL, GFP_NOIO);
4924 ret = 0;
1e130199
AE
4925
4926 if (IS_ERR(rbd_dev->header.object_prefix)) {
4927 ret = PTR_ERR(rbd_dev->header.object_prefix);
4928 rbd_dev->header.object_prefix = NULL;
4929 } else {
4930 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4931 }
1e130199
AE
4932out:
4933 kfree(reply_buf);
4934
4935 return ret;
4936}
4937
b1b5402a
AE
4938static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4939 u64 *snap_features)
4940{
4941 __le64 snapid = cpu_to_le64(snap_id);
4942 struct {
4943 __le64 features;
4944 __le64 incompat;
4157976b 4945 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4946 u64 unsup;
b1b5402a
AE
4947 int ret;
4948
ecd4a68a
ID
4949 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4950 &rbd_dev->header_oloc, "get_features",
4951 &snapid, sizeof(snapid),
4952 &features_buf, sizeof(features_buf));
36be9a76 4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4954 if (ret < 0)
4955 return ret;
57385b51
AE
4956 if (ret < sizeof (features_buf))
4957 return -ERANGE;
d889140c 4958
d3767f0f
ID
4959 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4960 if (unsup) {
4961 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4962 unsup);
b8f5c6ed 4963 return -ENXIO;
d3767f0f 4964 }
d889140c 4965
b1b5402a
AE
4966 *snap_features = le64_to_cpu(features_buf.features);
4967
4968 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4969 (unsigned long long)snap_id,
4970 (unsigned long long)*snap_features,
4971 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4972
4973 return 0;
4974}
4975
4976static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4977{
4978 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4979 &rbd_dev->header.features);
4980}
4981
86b00e0d
AE
4982static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4983{
4984 struct rbd_spec *parent_spec;
4985 size_t size;
4986 void *reply_buf = NULL;
4987 __le64 snapid;
4988 void *p;
4989 void *end;
642a2537 4990 u64 pool_id;
86b00e0d 4991 char *image_id;
3b5cf2a2 4992 u64 snap_id;
86b00e0d 4993 u64 overlap;
86b00e0d
AE
4994 int ret;
4995
4996 parent_spec = rbd_spec_alloc();
4997 if (!parent_spec)
4998 return -ENOMEM;
4999
5000 size = sizeof (__le64) + /* pool_id */
5001 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5002 sizeof (__le64) + /* snap_id */
5003 sizeof (__le64); /* overlap */
5004 reply_buf = kmalloc(size, GFP_KERNEL);
5005 if (!reply_buf) {
5006 ret = -ENOMEM;
5007 goto out_err;
5008 }
5009
4d9b67cd 5010 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
5011 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5012 &rbd_dev->header_oloc, "get_parent",
5013 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5014 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5015 if (ret < 0)
5016 goto out_err;
5017
86b00e0d 5018 p = reply_buf;
57385b51
AE
5019 end = reply_buf + ret;
5020 ret = -ERANGE;
642a2537 5021 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5022 if (pool_id == CEPH_NOPOOL) {
5023 /*
5024 * Either the parent never existed, or we have
5025 * record of it but the image got flattened so it no
5026 * longer has a parent. When the parent of a
5027 * layered image disappears we immediately set the
5028 * overlap to 0. The effect of this is that all new
5029 * requests will be treated as if the image had no
5030 * parent.
5031 */
5032 if (rbd_dev->parent_overlap) {
5033 rbd_dev->parent_overlap = 0;
392a9dad
AE
5034 rbd_dev_parent_put(rbd_dev);
5035 pr_info("%s: clone image has been flattened\n",
5036 rbd_dev->disk->disk_name);
5037 }
5038
86b00e0d 5039 goto out; /* No parent? No problem. */
392a9dad 5040 }
86b00e0d 5041
0903e875
AE
5042 /* The ceph file layout needs to fit pool id in 32 bits */
5043
5044 ret = -EIO;
642a2537 5045 if (pool_id > (u64)U32_MAX) {
9584d508 5046 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5047 (unsigned long long)pool_id, U32_MAX);
57385b51 5048 goto out_err;
c0cd10db 5049 }
0903e875 5050
979ed480 5051 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5052 if (IS_ERR(image_id)) {
5053 ret = PTR_ERR(image_id);
5054 goto out_err;
5055 }
3b5cf2a2 5056 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5057 ceph_decode_64_safe(&p, end, overlap, out_err);
5058
3b5cf2a2
AE
5059 /*
5060 * The parent won't change (except when the clone is
5061 * flattened, already handled that). So we only need to
5062 * record the parent spec we have not already done so.
5063 */
5064 if (!rbd_dev->parent_spec) {
5065 parent_spec->pool_id = pool_id;
5066 parent_spec->image_id = image_id;
5067 parent_spec->snap_id = snap_id;
70cf49cf
AE
5068 rbd_dev->parent_spec = parent_spec;
5069 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5070 } else {
5071 kfree(image_id);
3b5cf2a2
AE
5072 }
5073
5074 /*
cf32bd9c
ID
5075 * We always update the parent overlap. If it's zero we issue
5076 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5077 */
3b5cf2a2 5078 if (!overlap) {
3b5cf2a2 5079 if (parent_spec) {
cf32bd9c
ID
5080 /* refresh, careful to warn just once */
5081 if (rbd_dev->parent_overlap)
5082 rbd_warn(rbd_dev,
5083 "clone now standalone (overlap became 0)");
3b5cf2a2 5084 } else {
cf32bd9c
ID
5085 /* initial probe */
5086 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5087 }
70cf49cf 5088 }
cf32bd9c
ID
5089 rbd_dev->parent_overlap = overlap;
5090
86b00e0d
AE
5091out:
5092 ret = 0;
5093out_err:
5094 kfree(reply_buf);
5095 rbd_spec_put(parent_spec);
5096
5097 return ret;
5098}
5099
cc070d59
AE
5100static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5101{
5102 struct {
5103 __le64 stripe_unit;
5104 __le64 stripe_count;
5105 } __attribute__ ((packed)) striping_info_buf = { 0 };
5106 size_t size = sizeof (striping_info_buf);
5107 void *p;
5108 u64 obj_size;
5109 u64 stripe_unit;
5110 u64 stripe_count;
5111 int ret;
5112
ecd4a68a
ID
5113 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5114 &rbd_dev->header_oloc, "get_stripe_unit_count",
5115 NULL, 0, &striping_info_buf, size);
cc070d59
AE
5116 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5117 if (ret < 0)
5118 return ret;
5119 if (ret < size)
5120 return -ERANGE;
5121
5122 /*
5123 * We don't actually support the "fancy striping" feature
5124 * (STRIPINGV2) yet, but if the striping sizes are the
5125 * defaults the behavior is the same as before. So find
5126 * out, and only fail if the image has non-default values.
5127 */
5128 ret = -EINVAL;
5bc3fb17 5129 obj_size = rbd_obj_bytes(&rbd_dev->header);
cc070d59
AE
5130 p = &striping_info_buf;
5131 stripe_unit = ceph_decode_64(&p);
5132 if (stripe_unit != obj_size) {
5133 rbd_warn(rbd_dev, "unsupported stripe unit "
5134 "(got %llu want %llu)",
5135 stripe_unit, obj_size);
5136 return -EINVAL;
5137 }
5138 stripe_count = ceph_decode_64(&p);
5139 if (stripe_count != 1) {
5140 rbd_warn(rbd_dev, "unsupported stripe count "
5141 "(got %llu want 1)", stripe_count);
5142 return -EINVAL;
5143 }
500d0c0f
AE
5144 rbd_dev->header.stripe_unit = stripe_unit;
5145 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5146
5147 return 0;
5148}
5149
7e97332e
ID
5150static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5151{
5152 __le64 data_pool_id;
5153 int ret;
5154
5155 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5156 &rbd_dev->header_oloc, "get_data_pool",
5157 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5158 if (ret < 0)
5159 return ret;
5160 if (ret < sizeof(data_pool_id))
5161 return -EBADMSG;
5162
5163 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5164 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5165 return 0;
5166}
5167
9e15b77d
AE
5168static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5169{
ecd4a68a 5170 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
5171 size_t image_id_size;
5172 char *image_id;
5173 void *p;
5174 void *end;
5175 size_t size;
5176 void *reply_buf = NULL;
5177 size_t len = 0;
5178 char *image_name = NULL;
5179 int ret;
5180
5181 rbd_assert(!rbd_dev->spec->image_name);
5182
69e7a02f
AE
5183 len = strlen(rbd_dev->spec->image_id);
5184 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5185 image_id = kmalloc(image_id_size, GFP_KERNEL);
5186 if (!image_id)
5187 return NULL;
5188
5189 p = image_id;
4157976b 5190 end = image_id + image_id_size;
57385b51 5191 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5192
5193 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5194 reply_buf = kmalloc(size, GFP_KERNEL);
5195 if (!reply_buf)
5196 goto out;
5197
ecd4a68a
ID
5198 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5199 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5200 "dir_get_name", image_id, image_id_size,
5201 reply_buf, size);
9e15b77d
AE
5202 if (ret < 0)
5203 goto out;
5204 p = reply_buf;
f40eb349
AE
5205 end = reply_buf + ret;
5206
9e15b77d
AE
5207 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5208 if (IS_ERR(image_name))
5209 image_name = NULL;
5210 else
5211 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5212out:
5213 kfree(reply_buf);
5214 kfree(image_id);
5215
5216 return image_name;
5217}
5218
2ad3d716
AE
5219static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5220{
5221 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5222 const char *snap_name;
5223 u32 which = 0;
5224
5225 /* Skip over names until we find the one we are looking for */
5226
5227 snap_name = rbd_dev->header.snap_names;
5228 while (which < snapc->num_snaps) {
5229 if (!strcmp(name, snap_name))
5230 return snapc->snaps[which];
5231 snap_name += strlen(snap_name) + 1;
5232 which++;
5233 }
5234 return CEPH_NOSNAP;
5235}
5236
5237static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5238{
5239 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5240 u32 which;
5241 bool found = false;
5242 u64 snap_id;
5243
5244 for (which = 0; !found && which < snapc->num_snaps; which++) {
5245 const char *snap_name;
5246
5247 snap_id = snapc->snaps[which];
5248 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5249 if (IS_ERR(snap_name)) {
5250 /* ignore no-longer existing snapshots */
5251 if (PTR_ERR(snap_name) == -ENOENT)
5252 continue;
5253 else
5254 break;
5255 }
2ad3d716
AE
5256 found = !strcmp(name, snap_name);
5257 kfree(snap_name);
5258 }
5259 return found ? snap_id : CEPH_NOSNAP;
5260}
5261
5262/*
5263 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5264 * no snapshot by that name is found, or if an error occurs.
5265 */
5266static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5267{
5268 if (rbd_dev->image_format == 1)
5269 return rbd_v1_snap_id_by_name(rbd_dev, name);
5270
5271 return rbd_v2_snap_id_by_name(rbd_dev, name);
5272}
5273
9e15b77d 5274/*
04077599
ID
5275 * An image being mapped will have everything but the snap id.
5276 */
5277static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5278{
5279 struct rbd_spec *spec = rbd_dev->spec;
5280
5281 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5282 rbd_assert(spec->image_id && spec->image_name);
5283 rbd_assert(spec->snap_name);
5284
5285 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5286 u64 snap_id;
5287
5288 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5289 if (snap_id == CEPH_NOSNAP)
5290 return -ENOENT;
5291
5292 spec->snap_id = snap_id;
5293 } else {
5294 spec->snap_id = CEPH_NOSNAP;
5295 }
5296
5297 return 0;
5298}
5299
5300/*
5301 * A parent image will have all ids but none of the names.
e1d4213f 5302 *
04077599
ID
5303 * All names in an rbd spec are dynamically allocated. It's OK if we
5304 * can't figure out the name for an image id.
9e15b77d 5305 */
04077599 5306static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5307{
2e9f7f1c
AE
5308 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5309 struct rbd_spec *spec = rbd_dev->spec;
5310 const char *pool_name;
5311 const char *image_name;
5312 const char *snap_name;
9e15b77d
AE
5313 int ret;
5314
04077599
ID
5315 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5316 rbd_assert(spec->image_id);
5317 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5318
2e9f7f1c 5319 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5320
2e9f7f1c
AE
5321 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5322 if (!pool_name) {
5323 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5324 return -EIO;
5325 }
2e9f7f1c
AE
5326 pool_name = kstrdup(pool_name, GFP_KERNEL);
5327 if (!pool_name)
9e15b77d
AE
5328 return -ENOMEM;
5329
5330 /* Fetch the image name; tolerate failure here */
5331
2e9f7f1c
AE
5332 image_name = rbd_dev_image_name(rbd_dev);
5333 if (!image_name)
06ecc6cb 5334 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5335
04077599 5336 /* Fetch the snapshot name */
9e15b77d 5337
2e9f7f1c 5338 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5339 if (IS_ERR(snap_name)) {
5340 ret = PTR_ERR(snap_name);
9e15b77d 5341 goto out_err;
2e9f7f1c
AE
5342 }
5343
5344 spec->pool_name = pool_name;
5345 spec->image_name = image_name;
5346 spec->snap_name = snap_name;
9e15b77d
AE
5347
5348 return 0;
04077599 5349
9e15b77d 5350out_err:
2e9f7f1c
AE
5351 kfree(image_name);
5352 kfree(pool_name);
9e15b77d
AE
5353 return ret;
5354}
5355
cc4a38bd 5356static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5357{
5358 size_t size;
5359 int ret;
5360 void *reply_buf;
5361 void *p;
5362 void *end;
5363 u64 seq;
5364 u32 snap_count;
5365 struct ceph_snap_context *snapc;
5366 u32 i;
5367
5368 /*
5369 * We'll need room for the seq value (maximum snapshot id),
5370 * snapshot count, and array of that many snapshot ids.
5371 * For now we have a fixed upper limit on the number we're
5372 * prepared to receive.
5373 */
5374 size = sizeof (__le64) + sizeof (__le32) +
5375 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5376 reply_buf = kzalloc(size, GFP_KERNEL);
5377 if (!reply_buf)
5378 return -ENOMEM;
5379
ecd4a68a
ID
5380 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5381 &rbd_dev->header_oloc, "get_snapcontext",
5382 NULL, 0, reply_buf, size);
36be9a76 5383 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5384 if (ret < 0)
5385 goto out;
5386
35d489f9 5387 p = reply_buf;
57385b51
AE
5388 end = reply_buf + ret;
5389 ret = -ERANGE;
35d489f9
AE
5390 ceph_decode_64_safe(&p, end, seq, out);
5391 ceph_decode_32_safe(&p, end, snap_count, out);
5392
5393 /*
5394 * Make sure the reported number of snapshot ids wouldn't go
5395 * beyond the end of our buffer. But before checking that,
5396 * make sure the computed size of the snapshot context we
5397 * allocate is representable in a size_t.
5398 */
5399 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5400 / sizeof (u64)) {
5401 ret = -EINVAL;
5402 goto out;
5403 }
5404 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5405 goto out;
468521c1 5406 ret = 0;
35d489f9 5407
812164f8 5408 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5409 if (!snapc) {
5410 ret = -ENOMEM;
5411 goto out;
5412 }
35d489f9 5413 snapc->seq = seq;
35d489f9
AE
5414 for (i = 0; i < snap_count; i++)
5415 snapc->snaps[i] = ceph_decode_64(&p);
5416
49ece554 5417 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5418 rbd_dev->header.snapc = snapc;
5419
5420 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5421 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5422out:
5423 kfree(reply_buf);
5424
57385b51 5425 return ret;
35d489f9
AE
5426}
5427
54cac61f
AE
5428static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5429 u64 snap_id)
b8b1e2db
AE
5430{
5431 size_t size;
5432 void *reply_buf;
54cac61f 5433 __le64 snapid;
b8b1e2db
AE
5434 int ret;
5435 void *p;
5436 void *end;
b8b1e2db
AE
5437 char *snap_name;
5438
5439 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5440 reply_buf = kmalloc(size, GFP_KERNEL);
5441 if (!reply_buf)
5442 return ERR_PTR(-ENOMEM);
5443
54cac61f 5444 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
5445 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5446 &rbd_dev->header_oloc, "get_snapshot_name",
5447 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5448 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5449 if (ret < 0) {
5450 snap_name = ERR_PTR(ret);
b8b1e2db 5451 goto out;
f40eb349 5452 }
b8b1e2db
AE
5453
5454 p = reply_buf;
f40eb349 5455 end = reply_buf + ret;
e5c35534 5456 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5457 if (IS_ERR(snap_name))
b8b1e2db 5458 goto out;
b8b1e2db 5459
f40eb349 5460 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5461 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5462out:
5463 kfree(reply_buf);
5464
f40eb349 5465 return snap_name;
b8b1e2db
AE
5466}
5467
2df3fac7 5468static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5469{
2df3fac7 5470 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5471 int ret;
117973fb 5472
1617e40c
JD
5473 ret = rbd_dev_v2_image_size(rbd_dev);
5474 if (ret)
cfbf6377 5475 return ret;
1617e40c 5476
2df3fac7
AE
5477 if (first_time) {
5478 ret = rbd_dev_v2_header_onetime(rbd_dev);
5479 if (ret)
cfbf6377 5480 return ret;
2df3fac7
AE
5481 }
5482
cc4a38bd 5483 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5484 if (ret && first_time) {
5485 kfree(rbd_dev->header.object_prefix);
5486 rbd_dev->header.object_prefix = NULL;
5487 }
117973fb
AE
5488
5489 return ret;
5490}
5491
a720ae09
ID
5492static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5493{
5494 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5495
5496 if (rbd_dev->image_format == 1)
5497 return rbd_dev_v1_header_info(rbd_dev);
5498
5499 return rbd_dev_v2_header_info(rbd_dev);
5500}
5501
e28fff26
AE
5502/*
5503 * Skips over white space at *buf, and updates *buf to point to the
5504 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5505 * the token (string of non-white space characters) found. Note
5506 * that *buf must be terminated with '\0'.
e28fff26
AE
5507 */
5508static inline size_t next_token(const char **buf)
5509{
5510 /*
5511 * These are the characters that produce nonzero for
5512 * isspace() in the "C" and "POSIX" locales.
5513 */
5514 const char *spaces = " \f\n\r\t\v";
5515
5516 *buf += strspn(*buf, spaces); /* Find start of token */
5517
5518 return strcspn(*buf, spaces); /* Return token length */
5519}
5520
ea3352f4
AE
5521/*
5522 * Finds the next token in *buf, dynamically allocates a buffer big
5523 * enough to hold a copy of it, and copies the token into the new
5524 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5525 * that a duplicate buffer is created even for a zero-length token.
5526 *
5527 * Returns a pointer to the newly-allocated duplicate, or a null
5528 * pointer if memory for the duplicate was not available. If
5529 * the lenp argument is a non-null pointer, the length of the token
5530 * (not including the '\0') is returned in *lenp.
5531 *
5532 * If successful, the *buf pointer will be updated to point beyond
5533 * the end of the found token.
5534 *
5535 * Note: uses GFP_KERNEL for allocation.
5536 */
5537static inline char *dup_token(const char **buf, size_t *lenp)
5538{
5539 char *dup;
5540 size_t len;
5541
5542 len = next_token(buf);
4caf35f9 5543 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5544 if (!dup)
5545 return NULL;
ea3352f4
AE
5546 *(dup + len) = '\0';
5547 *buf += len;
5548
5549 if (lenp)
5550 *lenp = len;
5551
5552 return dup;
5553}
5554
a725f65e 5555/*
859c31df
AE
5556 * Parse the options provided for an "rbd add" (i.e., rbd image
5557 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5558 * and the data written is passed here via a NUL-terminated buffer.
5559 * Returns 0 if successful or an error code otherwise.
d22f76e7 5560 *
859c31df
AE
5561 * The information extracted from these options is recorded in
5562 * the other parameters which return dynamically-allocated
5563 * structures:
5564 * ceph_opts
5565 * The address of a pointer that will refer to a ceph options
5566 * structure. Caller must release the returned pointer using
5567 * ceph_destroy_options() when it is no longer needed.
5568 * rbd_opts
5569 * Address of an rbd options pointer. Fully initialized by
5570 * this function; caller must release with kfree().
5571 * spec
5572 * Address of an rbd image specification pointer. Fully
5573 * initialized by this function based on parsed options.
5574 * Caller must release with rbd_spec_put().
5575 *
5576 * The options passed take this form:
5577 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5578 * where:
5579 * <mon_addrs>
5580 * A comma-separated list of one or more monitor addresses.
5581 * A monitor address is an ip address, optionally followed
5582 * by a port number (separated by a colon).
5583 * I.e.: ip1[:port1][,ip2[:port2]...]
5584 * <options>
5585 * A comma-separated list of ceph and/or rbd options.
5586 * <pool_name>
5587 * The name of the rados pool containing the rbd image.
5588 * <image_name>
5589 * The name of the image in that pool to map.
5590 * <snap_id>
5591 * An optional snapshot id. If provided, the mapping will
5592 * present data from the image at the time that snapshot was
5593 * created. The image head is used if no snapshot id is
5594 * provided. Snapshot mappings are always read-only.
a725f65e 5595 */
859c31df 5596static int rbd_add_parse_args(const char *buf,
dc79b113 5597 struct ceph_options **ceph_opts,
859c31df
AE
5598 struct rbd_options **opts,
5599 struct rbd_spec **rbd_spec)
e28fff26 5600{
d22f76e7 5601 size_t len;
859c31df 5602 char *options;
0ddebc0c 5603 const char *mon_addrs;
ecb4dc22 5604 char *snap_name;
0ddebc0c 5605 size_t mon_addrs_size;
859c31df 5606 struct rbd_spec *spec = NULL;
4e9afeba 5607 struct rbd_options *rbd_opts = NULL;
859c31df 5608 struct ceph_options *copts;
dc79b113 5609 int ret;
e28fff26
AE
5610
5611 /* The first four tokens are required */
5612
7ef3214a 5613 len = next_token(&buf);
4fb5d671
AE
5614 if (!len) {
5615 rbd_warn(NULL, "no monitor address(es) provided");
5616 return -EINVAL;
5617 }
0ddebc0c 5618 mon_addrs = buf;
f28e565a 5619 mon_addrs_size = len + 1;
7ef3214a 5620 buf += len;
a725f65e 5621
dc79b113 5622 ret = -EINVAL;
f28e565a
AE
5623 options = dup_token(&buf, NULL);
5624 if (!options)
dc79b113 5625 return -ENOMEM;
4fb5d671
AE
5626 if (!*options) {
5627 rbd_warn(NULL, "no options provided");
5628 goto out_err;
5629 }
e28fff26 5630
859c31df
AE
5631 spec = rbd_spec_alloc();
5632 if (!spec)
f28e565a 5633 goto out_mem;
859c31df
AE
5634
5635 spec->pool_name = dup_token(&buf, NULL);
5636 if (!spec->pool_name)
5637 goto out_mem;
4fb5d671
AE
5638 if (!*spec->pool_name) {
5639 rbd_warn(NULL, "no pool name provided");
5640 goto out_err;
5641 }
e28fff26 5642
69e7a02f 5643 spec->image_name = dup_token(&buf, NULL);
859c31df 5644 if (!spec->image_name)
f28e565a 5645 goto out_mem;
4fb5d671
AE
5646 if (!*spec->image_name) {
5647 rbd_warn(NULL, "no image name provided");
5648 goto out_err;
5649 }
d4b125e9 5650
f28e565a
AE
5651 /*
5652 * Snapshot name is optional; default is to use "-"
5653 * (indicating the head/no snapshot).
5654 */
3feeb894 5655 len = next_token(&buf);
820a5f3e 5656 if (!len) {
3feeb894
AE
5657 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5658 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5659 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5660 ret = -ENAMETOOLONG;
f28e565a 5661 goto out_err;
849b4260 5662 }
ecb4dc22
AE
5663 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5664 if (!snap_name)
f28e565a 5665 goto out_mem;
ecb4dc22
AE
5666 *(snap_name + len) = '\0';
5667 spec->snap_name = snap_name;
e5c35534 5668
0ddebc0c 5669 /* Initialize all rbd options to the defaults */
e28fff26 5670
4e9afeba
AE
5671 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5672 if (!rbd_opts)
5673 goto out_mem;
5674
5675 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5676 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5677 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5678 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5679
859c31df 5680 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5681 mon_addrs + mon_addrs_size - 1,
4e9afeba 5682 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5683 if (IS_ERR(copts)) {
5684 ret = PTR_ERR(copts);
dc79b113
AE
5685 goto out_err;
5686 }
859c31df
AE
5687 kfree(options);
5688
5689 *ceph_opts = copts;
4e9afeba 5690 *opts = rbd_opts;
859c31df 5691 *rbd_spec = spec;
0ddebc0c 5692
dc79b113 5693 return 0;
f28e565a 5694out_mem:
dc79b113 5695 ret = -ENOMEM;
d22f76e7 5696out_err:
859c31df
AE
5697 kfree(rbd_opts);
5698 rbd_spec_put(spec);
f28e565a 5699 kfree(options);
d22f76e7 5700
dc79b113 5701 return ret;
a725f65e
AE
5702}
5703
30ba1f02
ID
5704/*
5705 * Return pool id (>= 0) or a negative error code.
5706 */
5707static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5708{
a319bf56 5709 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5710 u64 newest_epoch;
30ba1f02
ID
5711 int tries = 0;
5712 int ret;
5713
5714again:
5715 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5716 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5717 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5718 &newest_epoch);
30ba1f02
ID
5719 if (ret < 0)
5720 return ret;
5721
5722 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5723 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5724 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5725 newest_epoch,
5726 opts->mount_timeout);
30ba1f02
ID
5727 goto again;
5728 } else {
5729 /* the osdmap we have is new enough */
5730 return -ENOENT;
5731 }
5732 }
5733
5734 return ret;
5735}
5736
e010dd0a
ID
5737static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5738{
5739 down_write(&rbd_dev->lock_rwsem);
5740 if (__rbd_is_lock_owner(rbd_dev))
5741 rbd_unlock(rbd_dev);
5742 up_write(&rbd_dev->lock_rwsem);
5743}
5744
5745static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5746{
5747 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5748 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5749 return -EINVAL;
5750 }
5751
5752 /* FIXME: "rbd map --exclusive" should be in interruptible */
5753 down_read(&rbd_dev->lock_rwsem);
5754 rbd_wait_state_locked(rbd_dev);
5755 up_read(&rbd_dev->lock_rwsem);
5756 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5757 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5758 return -EROFS;
5759 }
5760
5761 return 0;
5762}
5763
589d30e0
AE
5764/*
5765 * An rbd format 2 image has a unique identifier, distinct from the
5766 * name given to it by the user. Internally, that identifier is
5767 * what's used to specify the names of objects related to the image.
5768 *
5769 * A special "rbd id" object is used to map an rbd image name to its
5770 * id. If that object doesn't exist, then there is no v2 rbd image
5771 * with the supplied name.
5772 *
5773 * This function will record the given rbd_dev's image_id field if
5774 * it can be determined, and in that case will return 0. If any
5775 * errors occur a negative errno will be returned and the rbd_dev's
5776 * image_id field will be unchanged (and should be NULL).
5777 */
5778static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5779{
5780 int ret;
5781 size_t size;
ecd4a68a 5782 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5783 void *response;
c0fba368 5784 char *image_id;
2f82ee54 5785
2c0d0a10
AE
5786 /*
5787 * When probing a parent image, the image id is already
5788 * known (and the image name likely is not). There's no
c0fba368
AE
5789 * need to fetch the image id again in this case. We
5790 * do still need to set the image format though.
2c0d0a10 5791 */
c0fba368
AE
5792 if (rbd_dev->spec->image_id) {
5793 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5794
2c0d0a10 5795 return 0;
c0fba368 5796 }
2c0d0a10 5797
589d30e0
AE
5798 /*
5799 * First, see if the format 2 image id file exists, and if
5800 * so, get the image's persistent id from it.
5801 */
ecd4a68a
ID
5802 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5803 rbd_dev->spec->image_name);
5804 if (ret)
5805 return ret;
5806
5807 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5808
5809 /* Response will be an encoded string, which includes a length */
5810
5811 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5812 response = kzalloc(size, GFP_NOIO);
5813 if (!response) {
5814 ret = -ENOMEM;
5815 goto out;
5816 }
5817
c0fba368
AE
5818 /* If it doesn't exist we'll assume it's a format 1 image */
5819
ecd4a68a
ID
5820 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5821 "get_id", NULL, 0,
5822 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5824 if (ret == -ENOENT) {
5825 image_id = kstrdup("", GFP_KERNEL);
5826 ret = image_id ? 0 : -ENOMEM;
5827 if (!ret)
5828 rbd_dev->image_format = 1;
7dd440c9 5829 } else if (ret >= 0) {
c0fba368
AE
5830 void *p = response;
5831
5832 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5833 NULL, GFP_NOIO);
461f758a 5834 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5835 if (!ret)
5836 rbd_dev->image_format = 2;
c0fba368
AE
5837 }
5838
5839 if (!ret) {
5840 rbd_dev->spec->image_id = image_id;
5841 dout("image_id is %s\n", image_id);
589d30e0
AE
5842 }
5843out:
5844 kfree(response);
ecd4a68a 5845 ceph_oid_destroy(&oid);
589d30e0
AE
5846 return ret;
5847}
5848
3abef3b3
AE
5849/*
5850 * Undo whatever state changes are made by v1 or v2 header info
5851 * call.
5852 */
6fd48b3b
AE
5853static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5854{
5855 struct rbd_image_header *header;
5856
e69b8d41 5857 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5858
5859 /* Free dynamic fields from the header, then zero it out */
5860
5861 header = &rbd_dev->header;
812164f8 5862 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5863 kfree(header->snap_sizes);
5864 kfree(header->snap_names);
5865 kfree(header->object_prefix);
5866 memset(header, 0, sizeof (*header));
5867}
5868
2df3fac7 5869static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5870{
5871 int ret;
a30b71b9 5872
1e130199 5873 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5874 if (ret)
b1b5402a
AE
5875 goto out_err;
5876
2df3fac7
AE
5877 /*
5878 * Get the and check features for the image. Currently the
5879 * features are assumed to never change.
5880 */
b1b5402a 5881 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5882 if (ret)
9d475de5 5883 goto out_err;
35d489f9 5884
cc070d59
AE
5885 /* If the image supports fancy striping, get its parameters */
5886
5887 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5888 ret = rbd_dev_v2_striping_info(rbd_dev);
5889 if (ret < 0)
5890 goto out_err;
5891 }
a30b71b9 5892
7e97332e
ID
5893 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5894 ret = rbd_dev_v2_data_pool(rbd_dev);
5895 if (ret)
5896 goto out_err;
5897 }
5898
263423f8 5899 rbd_init_layout(rbd_dev);
35152979 5900 return 0;
263423f8 5901
9d475de5 5902out_err:
642a2537 5903 rbd_dev->header.features = 0;
1e130199
AE
5904 kfree(rbd_dev->header.object_prefix);
5905 rbd_dev->header.object_prefix = NULL;
9d475de5 5906 return ret;
a30b71b9
AE
5907}
5908
6d69bb53
ID
5909/*
5910 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5911 * rbd_dev_image_probe() recursion depth, which means it's also the
5912 * length of the already discovered part of the parent chain.
5913 */
5914static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5915{
2f82ee54 5916 struct rbd_device *parent = NULL;
124afba2
AE
5917 int ret;
5918
5919 if (!rbd_dev->parent_spec)
5920 return 0;
124afba2 5921
6d69bb53
ID
5922 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5923 pr_info("parent chain is too long (%d)\n", depth);
5924 ret = -EINVAL;
5925 goto out_err;
5926 }
5927
1643dfa4 5928 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5929 if (!parent) {
5930 ret = -ENOMEM;
124afba2 5931 goto out_err;
1f2c6651
ID
5932 }
5933
5934 /*
5935 * Images related by parent/child relationships always share
5936 * rbd_client and spec/parent_spec, so bump their refcounts.
5937 */
5938 __rbd_get_client(rbd_dev->rbd_client);
5939 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5940
6d69bb53 5941 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5942 if (ret < 0)
5943 goto out_err;
1f2c6651 5944
124afba2 5945 rbd_dev->parent = parent;
a2acd00e 5946 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5947 return 0;
1f2c6651 5948
124afba2 5949out_err:
1f2c6651 5950 rbd_dev_unparent(rbd_dev);
1761b229 5951 rbd_dev_destroy(parent);
124afba2
AE
5952 return ret;
5953}
5954
5769ed0c
ID
5955static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5956{
5957 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5958 rbd_dev_mapping_clear(rbd_dev);
5959 rbd_free_disk(rbd_dev);
5960 if (!single_major)
5961 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5962}
5963
811c6688
ID
5964/*
5965 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5966 * upon return.
5967 */
200a6a8b 5968static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5969{
83a06263 5970 int ret;
d1cf5788 5971
9b60e70b 5972 /* Record our major and minor device numbers. */
83a06263 5973
9b60e70b
ID
5974 if (!single_major) {
5975 ret = register_blkdev(0, rbd_dev->name);
5976 if (ret < 0)
1643dfa4 5977 goto err_out_unlock;
9b60e70b
ID
5978
5979 rbd_dev->major = ret;
5980 rbd_dev->minor = 0;
5981 } else {
5982 rbd_dev->major = rbd_major;
5983 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5984 }
83a06263
AE
5985
5986 /* Set up the blkdev mapping. */
5987
5988 ret = rbd_init_disk(rbd_dev);
5989 if (ret)
5990 goto err_out_blkdev;
5991
f35a4dee 5992 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5993 if (ret)
5994 goto err_out_disk;
bc1ecc65 5995
f35a4dee 5996 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 5997 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 5998
5769ed0c 5999 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 6000 if (ret)
f5ee37bd 6001 goto err_out_mapping;
83a06263 6002
129b79d4 6003 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 6004 up_write(&rbd_dev->header_rwsem);
5769ed0c 6005 return 0;
2f82ee54 6006
f35a4dee
AE
6007err_out_mapping:
6008 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
6009err_out_disk:
6010 rbd_free_disk(rbd_dev);
6011err_out_blkdev:
9b60e70b
ID
6012 if (!single_major)
6013 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
6014err_out_unlock:
6015 up_write(&rbd_dev->header_rwsem);
83a06263
AE
6016 return ret;
6017}
6018
332bb12d
AE
6019static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6020{
6021 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 6022 int ret;
332bb12d
AE
6023
6024 /* Record the header object name for this rbd image. */
6025
6026 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 6027 if (rbd_dev->image_format == 1)
c41d13a3
ID
6028 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6029 spec->image_name, RBD_SUFFIX);
332bb12d 6030 else
c41d13a3
ID
6031 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6032 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 6033
c41d13a3 6034 return ret;
332bb12d
AE
6035}
6036
200a6a8b
AE
6037static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6038{
6fd48b3b 6039 rbd_dev_unprobe(rbd_dev);
fd22aef8
ID
6040 if (rbd_dev->opts)
6041 rbd_unregister_watch(rbd_dev);
6fd48b3b
AE
6042 rbd_dev->image_format = 0;
6043 kfree(rbd_dev->spec->image_id);
6044 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
6045}
6046
a30b71b9
AE
6047/*
6048 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6049 * device. If this image is the one being mapped (i.e., not a
6050 * parent), initiate a watch on its header object before using that
6051 * object to get detailed information about the rbd image.
a30b71b9 6052 */
6d69bb53 6053static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6054{
6055 int ret;
6056
6057 /*
3abef3b3
AE
6058 * Get the id from the image id object. Unless there's an
6059 * error, rbd_dev->spec->image_id will be filled in with
6060 * a dynamically-allocated string, and rbd_dev->image_format
6061 * will be set to either 1 or 2.
a30b71b9
AE
6062 */
6063 ret = rbd_dev_image_id(rbd_dev);
6064 if (ret)
c0fba368 6065 return ret;
c0fba368 6066
332bb12d
AE
6067 ret = rbd_dev_header_name(rbd_dev);
6068 if (ret)
6069 goto err_out_format;
6070
6d69bb53 6071 if (!depth) {
99d16943 6072 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6073 if (ret) {
6074 if (ret == -ENOENT)
6075 pr_info("image %s/%s does not exist\n",
6076 rbd_dev->spec->pool_name,
6077 rbd_dev->spec->image_name);
c41d13a3 6078 goto err_out_format;
1fe48023 6079 }
1f3ef788 6080 }
b644de2b 6081
a720ae09 6082 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6083 if (ret)
b644de2b 6084 goto err_out_watch;
83a06263 6085
04077599
ID
6086 /*
6087 * If this image is the one being mapped, we have pool name and
6088 * id, image name and id, and snap name - need to fill snap id.
6089 * Otherwise this is a parent image, identified by pool, image
6090 * and snap ids - need to fill in names for those ids.
6091 */
6d69bb53 6092 if (!depth)
04077599
ID
6093 ret = rbd_spec_fill_snap_id(rbd_dev);
6094 else
6095 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6096 if (ret) {
6097 if (ret == -ENOENT)
6098 pr_info("snap %s/%s@%s does not exist\n",
6099 rbd_dev->spec->pool_name,
6100 rbd_dev->spec->image_name,
6101 rbd_dev->spec->snap_name);
33dca39f 6102 goto err_out_probe;
1fe48023 6103 }
9bb81c9b 6104
e8f59b59
ID
6105 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6106 ret = rbd_dev_v2_parent_info(rbd_dev);
6107 if (ret)
6108 goto err_out_probe;
6109
6110 /*
6111 * Need to warn users if this image is the one being
6112 * mapped and has a parent.
6113 */
6d69bb53 6114 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6115 rbd_warn(rbd_dev,
6116 "WARNING: kernel layering is EXPERIMENTAL!");
6117 }
6118
6d69bb53 6119 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6120 if (ret)
6121 goto err_out_probe;
6122
6123 dout("discovered format %u image, header name is %s\n",
c41d13a3 6124 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6125 return 0;
e8f59b59 6126
6fd48b3b
AE
6127err_out_probe:
6128 rbd_dev_unprobe(rbd_dev);
b644de2b 6129err_out_watch:
6d69bb53 6130 if (!depth)
99d16943 6131 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6132err_out_format:
6133 rbd_dev->image_format = 0;
5655c4d9
AE
6134 kfree(rbd_dev->spec->image_id);
6135 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6136 return ret;
6137}
6138
9b60e70b
ID
6139static ssize_t do_rbd_add(struct bus_type *bus,
6140 const char *buf,
6141 size_t count)
602adf40 6142{
cb8627c7 6143 struct rbd_device *rbd_dev = NULL;
dc79b113 6144 struct ceph_options *ceph_opts = NULL;
4e9afeba 6145 struct rbd_options *rbd_opts = NULL;
859c31df 6146 struct rbd_spec *spec = NULL;
9d3997fd 6147 struct rbd_client *rbdc;
51344a38 6148 bool read_only;
b51c83c2 6149 int rc;
602adf40
YS
6150
6151 if (!try_module_get(THIS_MODULE))
6152 return -ENODEV;
6153
602adf40 6154 /* parse add command */
859c31df 6155 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6156 if (rc < 0)
dd5ac32d 6157 goto out;
78cea76e 6158
9d3997fd
AE
6159 rbdc = rbd_get_client(ceph_opts);
6160 if (IS_ERR(rbdc)) {
6161 rc = PTR_ERR(rbdc);
0ddebc0c 6162 goto err_out_args;
9d3997fd 6163 }
602adf40 6164
602adf40 6165 /* pick the pool */
30ba1f02 6166 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6167 if (rc < 0) {
6168 if (rc == -ENOENT)
6169 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6170 goto err_out_client;
1fe48023 6171 }
c0cd10db 6172 spec->pool_id = (u64)rc;
859c31df 6173
d147543d 6174 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6175 if (!rbd_dev) {
6176 rc = -ENOMEM;
bd4ba655 6177 goto err_out_client;
b51c83c2 6178 }
c53d5893
AE
6179 rbdc = NULL; /* rbd_dev now owns this */
6180 spec = NULL; /* rbd_dev now owns this */
d147543d 6181 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6182
0d6d1e9c
MC
6183 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6184 if (!rbd_dev->config_info) {
6185 rc = -ENOMEM;
6186 goto err_out_rbd_dev;
6187 }
6188
811c6688 6189 down_write(&rbd_dev->header_rwsem);
6d69bb53 6190 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
6191 if (rc < 0) {
6192 up_write(&rbd_dev->header_rwsem);
c53d5893 6193 goto err_out_rbd_dev;
0d6d1e9c 6194 }
05fd6f6f 6195
7ce4eef7
AE
6196 /* If we are mapping a snapshot it must be marked read-only */
6197
d147543d 6198 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6199 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6200 read_only = true;
6201 rbd_dev->mapping.read_only = read_only;
6202
b536f69a 6203 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 6204 if (rc)
8b679ec5 6205 goto err_out_image_probe;
3abef3b3 6206
e010dd0a
ID
6207 if (rbd_dev->opts->exclusive) {
6208 rc = rbd_add_acquire_lock(rbd_dev);
6209 if (rc)
6210 goto err_out_device_setup;
3abef3b3
AE
6211 }
6212
5769ed0c
ID
6213 /* Everything's ready. Announce the disk to the world. */
6214
6215 rc = device_add(&rbd_dev->dev);
6216 if (rc)
e010dd0a 6217 goto err_out_image_lock;
5769ed0c
ID
6218
6219 add_disk(rbd_dev->disk);
6220 /* see rbd_init_disk() */
6221 blk_put_queue(rbd_dev->disk->queue);
6222
6223 spin_lock(&rbd_dev_list_lock);
6224 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6225 spin_unlock(&rbd_dev_list_lock);
6226
6227 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6228 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6229 rbd_dev->header.features);
dd5ac32d
ID
6230 rc = count;
6231out:
6232 module_put(THIS_MODULE);
6233 return rc;
b536f69a 6234
e010dd0a
ID
6235err_out_image_lock:
6236 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
6237err_out_device_setup:
6238 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
6239err_out_image_probe:
6240 rbd_dev_image_release(rbd_dev);
c53d5893
AE
6241err_out_rbd_dev:
6242 rbd_dev_destroy(rbd_dev);
bd4ba655 6243err_out_client:
9d3997fd 6244 rbd_put_client(rbdc);
0ddebc0c 6245err_out_args:
859c31df 6246 rbd_spec_put(spec);
d147543d 6247 kfree(rbd_opts);
dd5ac32d 6248 goto out;
602adf40
YS
6249}
6250
9b60e70b
ID
6251static ssize_t rbd_add(struct bus_type *bus,
6252 const char *buf,
6253 size_t count)
6254{
6255 if (single_major)
6256 return -EINVAL;
6257
6258 return do_rbd_add(bus, buf, count);
6259}
6260
6261static ssize_t rbd_add_single_major(struct bus_type *bus,
6262 const char *buf,
6263 size_t count)
6264{
6265 return do_rbd_add(bus, buf, count);
6266}
6267
05a46afd
AE
6268static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6269{
ad945fc1 6270 while (rbd_dev->parent) {
05a46afd
AE
6271 struct rbd_device *first = rbd_dev;
6272 struct rbd_device *second = first->parent;
6273 struct rbd_device *third;
6274
6275 /*
6276 * Follow to the parent with no grandparent and
6277 * remove it.
6278 */
6279 while (second && (third = second->parent)) {
6280 first = second;
6281 second = third;
6282 }
ad945fc1 6283 rbd_assert(second);
8ad42cd0 6284 rbd_dev_image_release(second);
8b679ec5 6285 rbd_dev_destroy(second);
ad945fc1
AE
6286 first->parent = NULL;
6287 first->parent_overlap = 0;
6288
6289 rbd_assert(first->parent_spec);
05a46afd
AE
6290 rbd_spec_put(first->parent_spec);
6291 first->parent_spec = NULL;
05a46afd
AE
6292 }
6293}
6294
9b60e70b
ID
6295static ssize_t do_rbd_remove(struct bus_type *bus,
6296 const char *buf,
6297 size_t count)
602adf40
YS
6298{
6299 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6300 struct list_head *tmp;
6301 int dev_id;
0276dca6 6302 char opt_buf[6];
82a442d2 6303 bool already = false;
0276dca6 6304 bool force = false;
0d8189e1 6305 int ret;
602adf40 6306
0276dca6
MC
6307 dev_id = -1;
6308 opt_buf[0] = '\0';
6309 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6310 if (dev_id < 0) {
6311 pr_err("dev_id out of range\n");
602adf40 6312 return -EINVAL;
0276dca6
MC
6313 }
6314 if (opt_buf[0] != '\0') {
6315 if (!strcmp(opt_buf, "force")) {
6316 force = true;
6317 } else {
6318 pr_err("bad remove option at '%s'\n", opt_buf);
6319 return -EINVAL;
6320 }
6321 }
602adf40 6322
751cc0e3
AE
6323 ret = -ENOENT;
6324 spin_lock(&rbd_dev_list_lock);
6325 list_for_each(tmp, &rbd_dev_list) {
6326 rbd_dev = list_entry(tmp, struct rbd_device, node);
6327 if (rbd_dev->dev_id == dev_id) {
6328 ret = 0;
6329 break;
6330 }
42382b70 6331 }
751cc0e3
AE
6332 if (!ret) {
6333 spin_lock_irq(&rbd_dev->lock);
0276dca6 6334 if (rbd_dev->open_count && !force)
751cc0e3
AE
6335 ret = -EBUSY;
6336 else
82a442d2
AE
6337 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6338 &rbd_dev->flags);
751cc0e3
AE
6339 spin_unlock_irq(&rbd_dev->lock);
6340 }
6341 spin_unlock(&rbd_dev_list_lock);
82a442d2 6342 if (ret < 0 || already)
1ba0f1e7 6343 return ret;
751cc0e3 6344
0276dca6
MC
6345 if (force) {
6346 /*
6347 * Prevent new IO from being queued and wait for existing
6348 * IO to complete/fail.
6349 */
6350 blk_mq_freeze_queue(rbd_dev->disk->queue);
6351 blk_set_queue_dying(rbd_dev->disk->queue);
6352 }
6353
5769ed0c
ID
6354 del_gendisk(rbd_dev->disk);
6355 spin_lock(&rbd_dev_list_lock);
6356 list_del_init(&rbd_dev->node);
6357 spin_unlock(&rbd_dev_list_lock);
6358 device_del(&rbd_dev->dev);
fca27065 6359
e010dd0a 6360 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 6361 rbd_dev_device_release(rbd_dev);
8ad42cd0 6362 rbd_dev_image_release(rbd_dev);
8b679ec5 6363 rbd_dev_destroy(rbd_dev);
1ba0f1e7 6364 return count;
602adf40
YS
6365}
6366
9b60e70b
ID
6367static ssize_t rbd_remove(struct bus_type *bus,
6368 const char *buf,
6369 size_t count)
6370{
6371 if (single_major)
6372 return -EINVAL;
6373
6374 return do_rbd_remove(bus, buf, count);
6375}
6376
6377static ssize_t rbd_remove_single_major(struct bus_type *bus,
6378 const char *buf,
6379 size_t count)
6380{
6381 return do_rbd_remove(bus, buf, count);
6382}
6383
602adf40
YS
6384/*
6385 * create control files in sysfs
dfc5606d 6386 * /sys/bus/rbd/...
602adf40
YS
6387 */
6388static int rbd_sysfs_init(void)
6389{
dfc5606d 6390 int ret;
602adf40 6391
fed4c143 6392 ret = device_register(&rbd_root_dev);
21079786 6393 if (ret < 0)
dfc5606d 6394 return ret;
602adf40 6395
fed4c143
AE
6396 ret = bus_register(&rbd_bus_type);
6397 if (ret < 0)
6398 device_unregister(&rbd_root_dev);
602adf40 6399
602adf40
YS
6400 return ret;
6401}
6402
6403static void rbd_sysfs_cleanup(void)
6404{
dfc5606d 6405 bus_unregister(&rbd_bus_type);
fed4c143 6406 device_unregister(&rbd_root_dev);
602adf40
YS
6407}
6408
1c2a9dfe
AE
6409static int rbd_slab_init(void)
6410{
6411 rbd_assert(!rbd_img_request_cache);
03d94406 6412 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6413 if (!rbd_img_request_cache)
6414 return -ENOMEM;
6415
6416 rbd_assert(!rbd_obj_request_cache);
03d94406 6417 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6418 if (!rbd_obj_request_cache)
6419 goto out_err;
6420
f856dc36
N
6421 rbd_assert(!rbd_bio_clone);
6422 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6423 if (!rbd_bio_clone)
6424 goto out_err_clone;
6425
6c696d85 6426 return 0;
1c2a9dfe 6427
f856dc36
N
6428out_err_clone:
6429 kmem_cache_destroy(rbd_obj_request_cache);
6430 rbd_obj_request_cache = NULL;
6c696d85 6431out_err:
868311b1
AE
6432 kmem_cache_destroy(rbd_img_request_cache);
6433 rbd_img_request_cache = NULL;
1c2a9dfe
AE
6434 return -ENOMEM;
6435}
6436
6437static void rbd_slab_exit(void)
6438{
868311b1
AE
6439 rbd_assert(rbd_obj_request_cache);
6440 kmem_cache_destroy(rbd_obj_request_cache);
6441 rbd_obj_request_cache = NULL;
6442
1c2a9dfe
AE
6443 rbd_assert(rbd_img_request_cache);
6444 kmem_cache_destroy(rbd_img_request_cache);
6445 rbd_img_request_cache = NULL;
f856dc36
N
6446
6447 rbd_assert(rbd_bio_clone);
6448 bioset_free(rbd_bio_clone);
6449 rbd_bio_clone = NULL;
1c2a9dfe
AE
6450}
6451
cc344fa1 6452static int __init rbd_init(void)
602adf40
YS
6453{
6454 int rc;
6455
1e32d34c
AE
6456 if (!libceph_compatible(NULL)) {
6457 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6458 return -EINVAL;
6459 }
e1b4d96d 6460
1c2a9dfe 6461 rc = rbd_slab_init();
602adf40
YS
6462 if (rc)
6463 return rc;
e1b4d96d 6464
f5ee37bd
ID
6465 /*
6466 * The number of active work items is limited by the number of
f77303bd 6467 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6468 */
6469 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6470 if (!rbd_wq) {
6471 rc = -ENOMEM;
6472 goto err_out_slab;
6473 }
6474
9b60e70b
ID
6475 if (single_major) {
6476 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6477 if (rbd_major < 0) {
6478 rc = rbd_major;
f5ee37bd 6479 goto err_out_wq;
9b60e70b
ID
6480 }
6481 }
6482
1c2a9dfe
AE
6483 rc = rbd_sysfs_init();
6484 if (rc)
9b60e70b
ID
6485 goto err_out_blkdev;
6486
6487 if (single_major)
6488 pr_info("loaded (major %d)\n", rbd_major);
6489 else
6490 pr_info("loaded\n");
1c2a9dfe 6491
e1b4d96d
ID
6492 return 0;
6493
9b60e70b
ID
6494err_out_blkdev:
6495 if (single_major)
6496 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6497err_out_wq:
6498 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6499err_out_slab:
6500 rbd_slab_exit();
1c2a9dfe 6501 return rc;
602adf40
YS
6502}
6503
cc344fa1 6504static void __exit rbd_exit(void)
602adf40 6505{
ffe312cf 6506 ida_destroy(&rbd_dev_id_ida);
602adf40 6507 rbd_sysfs_cleanup();
9b60e70b
ID
6508 if (single_major)
6509 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6510 destroy_workqueue(rbd_wq);
1c2a9dfe 6511 rbd_slab_exit();
602adf40
YS
6512}
6513
6514module_init(rbd_init);
6515module_exit(rbd_exit);
6516
d552c619 6517MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6518MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6519MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6520/* following authorship retained from original osdblk.c */
6521MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6522
90da258b 6523MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6524MODULE_LICENSE("GPL");