]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: don't call rbd_osd_req_format_read() for !img_data requests
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
5cbf6f12
AE
123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1)
ed95b21a
ID
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK)
d889140c
AE
129
130/* Features supported by this (client software) implementation. */
131
770eba6e 132#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 133
81a89793
AE
134/*
135 * An RBD device name will be "rbd#", where the "rbd" comes from
136 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 137 */
602adf40
YS
138#define DEV_NAME_LEN 32
139
140/*
141 * block device image metadata (in-memory version)
142 */
143struct rbd_image_header {
f35a4dee 144 /* These six fields never change for a given rbd image */
849b4260 145 char *object_prefix;
602adf40
YS
146 __u8 obj_order;
147 __u8 crypt_type;
148 __u8 comp_type;
f35a4dee
AE
149 u64 stripe_unit;
150 u64 stripe_count;
151 u64 features; /* Might be changeable someday? */
602adf40 152
f84344f3
AE
153 /* The remaining fields need to be updated occasionally */
154 u64 image_size;
155 struct ceph_snap_context *snapc;
f35a4dee
AE
156 char *snap_names; /* format 1 only */
157 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
158};
159
0d7dbfce
AE
160/*
161 * An rbd image specification.
162 *
163 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
164 * identify an image. Each rbd_dev structure includes a pointer to
165 * an rbd_spec structure that encapsulates this identity.
166 *
167 * Each of the id's in an rbd_spec has an associated name. For a
168 * user-mapped image, the names are supplied and the id's associated
169 * with them are looked up. For a layered image, a parent image is
170 * defined by the tuple, and the names are looked up.
171 *
172 * An rbd_dev structure contains a parent_spec pointer which is
173 * non-null if the image it represents is a child in a layered
174 * image. This pointer will refer to the rbd_spec structure used
175 * by the parent rbd_dev for its own identity (i.e., the structure
176 * is shared between the parent and child).
177 *
178 * Since these structures are populated once, during the discovery
179 * phase of image construction, they are effectively immutable so
180 * we make no effort to synchronize access to them.
181 *
182 * Note that code herein does not assume the image name is known (it
183 * could be a null pointer).
0d7dbfce
AE
184 */
185struct rbd_spec {
186 u64 pool_id;
ecb4dc22 187 const char *pool_name;
0d7dbfce 188
ecb4dc22
AE
189 const char *image_id;
190 const char *image_name;
0d7dbfce
AE
191
192 u64 snap_id;
ecb4dc22 193 const char *snap_name;
0d7dbfce
AE
194
195 struct kref kref;
196};
197
602adf40 198/*
f0f8cef5 199 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
200 */
201struct rbd_client {
202 struct ceph_client *client;
203 struct kref kref;
204 struct list_head node;
205};
206
bf0d5f50
AE
207struct rbd_img_request;
208typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
209
210#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
211
212struct rbd_obj_request;
213typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
214
9969ebc5
AE
215enum obj_request_type {
216 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
217};
bf0d5f50 218
6d2940c8
GZ
219enum obj_operation_type {
220 OBJ_OP_WRITE,
221 OBJ_OP_READ,
90e98c52 222 OBJ_OP_DISCARD,
6d2940c8
GZ
223};
224
926f9b3f
AE
225enum obj_req_flags {
226 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 227 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
228 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
229 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
230};
231
bf0d5f50
AE
232struct rbd_obj_request {
233 const char *object_name;
234 u64 offset; /* object start byte */
235 u64 length; /* bytes from offset */
926f9b3f 236 unsigned long flags;
bf0d5f50 237
c5b5ef6c
AE
238 /*
239 * An object request associated with an image will have its
240 * img_data flag set; a standalone object request will not.
241 *
242 * A standalone object request will have which == BAD_WHICH
243 * and a null obj_request pointer.
244 *
245 * An object request initiated in support of a layered image
246 * object (to check for its existence before a write) will
247 * have which == BAD_WHICH and a non-null obj_request pointer.
248 *
249 * Finally, an object request for rbd image data will have
250 * which != BAD_WHICH, and will have a non-null img_request
251 * pointer. The value of which will be in the range
252 * 0..(img_request->obj_request_count-1).
253 */
254 union {
255 struct rbd_obj_request *obj_request; /* STAT op */
256 struct {
257 struct rbd_img_request *img_request;
258 u64 img_offset;
259 /* links for img_request->obj_requests list */
260 struct list_head links;
261 };
262 };
bf0d5f50
AE
263 u32 which; /* posn image request list */
264
265 enum obj_request_type type;
788e2df3
AE
266 union {
267 struct bio *bio_list;
268 struct {
269 struct page **pages;
270 u32 page_count;
271 };
272 };
0eefd470 273 struct page **copyup_pages;
ebda6408 274 u32 copyup_page_count;
bf0d5f50
AE
275
276 struct ceph_osd_request *osd_req;
277
278 u64 xferred; /* bytes transferred */
1b83bef2 279 int result;
bf0d5f50
AE
280
281 rbd_obj_callback_t callback;
788e2df3 282 struct completion completion;
bf0d5f50
AE
283
284 struct kref kref;
285};
286
0c425248 287enum img_req_flags {
9849e986
AE
288 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
289 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 290 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 291 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
292};
293
bf0d5f50 294struct rbd_img_request {
bf0d5f50
AE
295 struct rbd_device *rbd_dev;
296 u64 offset; /* starting image byte offset */
297 u64 length; /* byte count from offset */
0c425248 298 unsigned long flags;
bf0d5f50 299 union {
9849e986 300 u64 snap_id; /* for reads */
bf0d5f50 301 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
302 };
303 union {
304 struct request *rq; /* block request */
305 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 306 };
3d7efd18 307 struct page **copyup_pages;
ebda6408 308 u32 copyup_page_count;
bf0d5f50
AE
309 spinlock_t completion_lock;/* protects next_completion */
310 u32 next_completion;
311 rbd_img_callback_t callback;
55f27e09 312 u64 xferred;/* aggregate bytes transferred */
a5a337d4 313 int result; /* first nonzero obj_request result */
bf0d5f50
AE
314
315 u32 obj_request_count;
316 struct list_head obj_requests; /* rbd_obj_request structs */
317
318 struct kref kref;
319};
320
321#define for_each_obj_request(ireq, oreq) \
ef06f4d3 322 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 323#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 324 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 325#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 326 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 327
99d16943
ID
328enum rbd_watch_state {
329 RBD_WATCH_STATE_UNREGISTERED,
330 RBD_WATCH_STATE_REGISTERED,
331 RBD_WATCH_STATE_ERROR,
332};
333
ed95b21a
ID
334enum rbd_lock_state {
335 RBD_LOCK_STATE_UNLOCKED,
336 RBD_LOCK_STATE_LOCKED,
337 RBD_LOCK_STATE_RELEASING,
338};
339
340/* WatchNotify::ClientId */
341struct rbd_client_id {
342 u64 gid;
343 u64 handle;
344};
345
f84344f3 346struct rbd_mapping {
99c1f08f 347 u64 size;
34b13184 348 u64 features;
f84344f3
AE
349 bool read_only;
350};
351
602adf40
YS
352/*
353 * a single device
354 */
355struct rbd_device {
de71a297 356 int dev_id; /* blkdev unique id */
602adf40
YS
357
358 int major; /* blkdev assigned major */
dd82fff1 359 int minor;
602adf40 360 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 361
a30b71b9 362 u32 image_format; /* Either 1 or 2 */
602adf40
YS
363 struct rbd_client *rbd_client;
364
365 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
366
b82d167b 367 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
368
369 struct rbd_image_header header;
b82d167b 370 unsigned long flags; /* possibly lock protected */
0d7dbfce 371 struct rbd_spec *spec;
d147543d 372 struct rbd_options *opts;
0d6d1e9c 373 char *config_info; /* add{,_single_major} string */
602adf40 374
c41d13a3 375 struct ceph_object_id header_oid;
922dab61 376 struct ceph_object_locator header_oloc;
971f839a 377
1643dfa4 378 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 379
99d16943
ID
380 struct mutex watch_mutex;
381 enum rbd_watch_state watch_state;
922dab61 382 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
383 u64 watch_cookie;
384 struct delayed_work watch_dwork;
59c2be1e 385
ed95b21a
ID
386 struct rw_semaphore lock_rwsem;
387 enum rbd_lock_state lock_state;
388 struct rbd_client_id owner_cid;
389 struct work_struct acquired_lock_work;
390 struct work_struct released_lock_work;
391 struct delayed_work lock_dwork;
392 struct work_struct unlock_work;
393 wait_queue_head_t lock_waitq;
394
1643dfa4
ID
395 struct workqueue_struct *task_wq;
396
86b00e0d
AE
397 struct rbd_spec *parent_spec;
398 u64 parent_overlap;
a2acd00e 399 atomic_t parent_ref;
2f82ee54 400 struct rbd_device *parent;
86b00e0d 401
7ad18afa
CH
402 /* Block layer tags. */
403 struct blk_mq_tag_set tag_set;
404
c666601a
JD
405 /* protects updating the header */
406 struct rw_semaphore header_rwsem;
f84344f3
AE
407
408 struct rbd_mapping mapping;
602adf40
YS
409
410 struct list_head node;
dfc5606d 411
dfc5606d
YS
412 /* sysfs related */
413 struct device dev;
b82d167b 414 unsigned long open_count; /* protected by lock */
dfc5606d
YS
415};
416
b82d167b
AE
417/*
418 * Flag bits for rbd_dev->flags. If atomicity is required,
419 * rbd_dev->lock is used to protect access.
420 *
421 * Currently, only the "removing" flag (which is coupled with the
422 * "open_count" field) requires atomic access.
423 */
6d292906
AE
424enum rbd_dev_flags {
425 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 426 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
427};
428
cfbf6377 429static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 430
602adf40 431static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
432static DEFINE_SPINLOCK(rbd_dev_list_lock);
433
432b8587
AE
434static LIST_HEAD(rbd_client_list); /* clients */
435static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 436
78c2a44a
AE
437/* Slab caches for frequently-allocated structures */
438
1c2a9dfe 439static struct kmem_cache *rbd_img_request_cache;
868311b1 440static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 441static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 442
9b60e70b 443static int rbd_major;
f8a22fc2
ID
444static DEFINE_IDA(rbd_dev_id_ida);
445
f5ee37bd
ID
446static struct workqueue_struct *rbd_wq;
447
9b60e70b
ID
448/*
449 * Default to false for now, as single-major requires >= 0.75 version of
450 * userspace rbd utility.
451 */
452static bool single_major = false;
453module_param(single_major, bool, S_IRUGO);
454MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
455
3d7efd18
AE
456static int rbd_img_request_submit(struct rbd_img_request *img_request);
457
f0f8cef5
AE
458static ssize_t rbd_add(struct bus_type *bus, const char *buf,
459 size_t count);
460static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
461 size_t count);
9b60e70b
ID
462static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
463 size_t count);
464static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
465 size_t count);
6d69bb53 466static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 467static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 468
9b60e70b
ID
469static int rbd_dev_id_to_minor(int dev_id)
470{
7e513d43 471 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
472}
473
474static int minor_to_rbd_dev_id(int minor)
475{
7e513d43 476 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
477}
478
ed95b21a
ID
479static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
480{
481 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
482 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
483 !rbd_dev->mapping.read_only;
484}
485
486static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
487{
488 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
489 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
490}
491
492static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
493{
494 bool is_lock_owner;
495
496 down_read(&rbd_dev->lock_rwsem);
497 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
498 up_read(&rbd_dev->lock_rwsem);
499 return is_lock_owner;
500}
501
b15a21dd
GKH
502static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
503static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
504static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
505static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
506
507static struct attribute *rbd_bus_attrs[] = {
508 &bus_attr_add.attr,
509 &bus_attr_remove.attr,
9b60e70b
ID
510 &bus_attr_add_single_major.attr,
511 &bus_attr_remove_single_major.attr,
b15a21dd 512 NULL,
f0f8cef5 513};
92c76dc0
ID
514
515static umode_t rbd_bus_is_visible(struct kobject *kobj,
516 struct attribute *attr, int index)
517{
9b60e70b
ID
518 if (!single_major &&
519 (attr == &bus_attr_add_single_major.attr ||
520 attr == &bus_attr_remove_single_major.attr))
521 return 0;
522
92c76dc0
ID
523 return attr->mode;
524}
525
526static const struct attribute_group rbd_bus_group = {
527 .attrs = rbd_bus_attrs,
528 .is_visible = rbd_bus_is_visible,
529};
530__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
531
532static struct bus_type rbd_bus_type = {
533 .name = "rbd",
b15a21dd 534 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
535};
536
537static void rbd_root_dev_release(struct device *dev)
538{
539}
540
541static struct device rbd_root_dev = {
542 .init_name = "rbd",
543 .release = rbd_root_dev_release,
544};
545
06ecc6cb
AE
546static __printf(2, 3)
547void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
548{
549 struct va_format vaf;
550 va_list args;
551
552 va_start(args, fmt);
553 vaf.fmt = fmt;
554 vaf.va = &args;
555
556 if (!rbd_dev)
557 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
558 else if (rbd_dev->disk)
559 printk(KERN_WARNING "%s: %s: %pV\n",
560 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
561 else if (rbd_dev->spec && rbd_dev->spec->image_name)
562 printk(KERN_WARNING "%s: image %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
564 else if (rbd_dev->spec && rbd_dev->spec->image_id)
565 printk(KERN_WARNING "%s: id %s: %pV\n",
566 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
567 else /* punt */
568 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
569 RBD_DRV_NAME, rbd_dev, &vaf);
570 va_end(args);
571}
572
aafb230e
AE
573#ifdef RBD_DEBUG
574#define rbd_assert(expr) \
575 if (unlikely(!(expr))) { \
576 printk(KERN_ERR "\nAssertion failure in %s() " \
577 "at line %d:\n\n" \
578 "\trbd_assert(%s);\n\n", \
579 __func__, __LINE__, #expr); \
580 BUG(); \
581 }
582#else /* !RBD_DEBUG */
583# define rbd_assert(expr) ((void) 0)
584#endif /* !RBD_DEBUG */
dfc5606d 585
2761713d 586static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 587static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
588static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
589static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 590
cc4a38bd 591static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 592static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 593static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 594static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
595static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
596 u64 snap_id);
2ad3d716
AE
597static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
598 u8 *order, u64 *snap_size);
599static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
600 u64 *snap_features);
59c2be1e 601
602adf40
YS
602static int rbd_open(struct block_device *bdev, fmode_t mode)
603{
f0f8cef5 604 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 605 bool removing = false;
602adf40 606
f84344f3 607 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
608 return -EROFS;
609
a14ea269 610 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
611 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
612 removing = true;
613 else
614 rbd_dev->open_count++;
a14ea269 615 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
616 if (removing)
617 return -ENOENT;
618
c3e946ce 619 (void) get_device(&rbd_dev->dev);
340c7a2b 620
602adf40
YS
621 return 0;
622}
623
db2a144b 624static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
625{
626 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
627 unsigned long open_count_before;
628
a14ea269 629 spin_lock_irq(&rbd_dev->lock);
b82d167b 630 open_count_before = rbd_dev->open_count--;
a14ea269 631 spin_unlock_irq(&rbd_dev->lock);
b82d167b 632 rbd_assert(open_count_before > 0);
dfc5606d 633
c3e946ce 634 put_device(&rbd_dev->dev);
dfc5606d
YS
635}
636
131fd9f6
GZ
637static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
638{
77f33c03 639 int ret = 0;
131fd9f6
GZ
640 int val;
641 bool ro;
77f33c03 642 bool ro_changed = false;
131fd9f6 643
77f33c03 644 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
645 if (get_user(val, (int __user *)(arg)))
646 return -EFAULT;
647
648 ro = val ? true : false;
649 /* Snapshot doesn't allow to write*/
650 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
651 return -EROFS;
652
77f33c03
JD
653 spin_lock_irq(&rbd_dev->lock);
654 /* prevent others open this device */
655 if (rbd_dev->open_count > 1) {
656 ret = -EBUSY;
657 goto out;
658 }
659
131fd9f6
GZ
660 if (rbd_dev->mapping.read_only != ro) {
661 rbd_dev->mapping.read_only = ro;
77f33c03 662 ro_changed = true;
131fd9f6
GZ
663 }
664
77f33c03
JD
665out:
666 spin_unlock_irq(&rbd_dev->lock);
667 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
668 if (ret == 0 && ro_changed)
669 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
670
671 return ret;
131fd9f6
GZ
672}
673
674static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
675 unsigned int cmd, unsigned long arg)
676{
677 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
678 int ret = 0;
679
131fd9f6
GZ
680 switch (cmd) {
681 case BLKROSET:
682 ret = rbd_ioctl_set_ro(rbd_dev, arg);
683 break;
684 default:
685 ret = -ENOTTY;
686 }
687
131fd9f6
GZ
688 return ret;
689}
690
691#ifdef CONFIG_COMPAT
692static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
693 unsigned int cmd, unsigned long arg)
694{
695 return rbd_ioctl(bdev, mode, cmd, arg);
696}
697#endif /* CONFIG_COMPAT */
698
602adf40
YS
699static const struct block_device_operations rbd_bd_ops = {
700 .owner = THIS_MODULE,
701 .open = rbd_open,
dfc5606d 702 .release = rbd_release,
131fd9f6
GZ
703 .ioctl = rbd_ioctl,
704#ifdef CONFIG_COMPAT
705 .compat_ioctl = rbd_compat_ioctl,
706#endif
602adf40
YS
707};
708
709/*
7262cfca 710 * Initialize an rbd client instance. Success or not, this function
cfbf6377 711 * consumes ceph_opts. Caller holds client_mutex.
602adf40 712 */
f8c38929 713static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
714{
715 struct rbd_client *rbdc;
716 int ret = -ENOMEM;
717
37206ee5 718 dout("%s:\n", __func__);
602adf40
YS
719 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
720 if (!rbdc)
721 goto out_opt;
722
723 kref_init(&rbdc->kref);
724 INIT_LIST_HEAD(&rbdc->node);
725
43ae4701 726 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 727 if (IS_ERR(rbdc->client))
08f75463 728 goto out_rbdc;
43ae4701 729 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
730
731 ret = ceph_open_session(rbdc->client);
732 if (ret < 0)
08f75463 733 goto out_client;
602adf40 734
432b8587 735 spin_lock(&rbd_client_list_lock);
602adf40 736 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 737 spin_unlock(&rbd_client_list_lock);
602adf40 738
37206ee5 739 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 740
602adf40 741 return rbdc;
08f75463 742out_client:
602adf40 743 ceph_destroy_client(rbdc->client);
08f75463 744out_rbdc:
602adf40
YS
745 kfree(rbdc);
746out_opt:
43ae4701
AE
747 if (ceph_opts)
748 ceph_destroy_options(ceph_opts);
37206ee5
AE
749 dout("%s: error %d\n", __func__, ret);
750
28f259b7 751 return ERR_PTR(ret);
602adf40
YS
752}
753
2f82ee54
AE
754static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
755{
756 kref_get(&rbdc->kref);
757
758 return rbdc;
759}
760
602adf40 761/*
1f7ba331
AE
762 * Find a ceph client with specific addr and configuration. If
763 * found, bump its reference count.
602adf40 764 */
1f7ba331 765static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
766{
767 struct rbd_client *client_node;
1f7ba331 768 bool found = false;
602adf40 769
43ae4701 770 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
771 return NULL;
772
1f7ba331
AE
773 spin_lock(&rbd_client_list_lock);
774 list_for_each_entry(client_node, &rbd_client_list, node) {
775 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
776 __rbd_get_client(client_node);
777
1f7ba331
AE
778 found = true;
779 break;
780 }
781 }
782 spin_unlock(&rbd_client_list_lock);
783
784 return found ? client_node : NULL;
602adf40
YS
785}
786
59c2be1e 787/*
210c104c 788 * (Per device) rbd map options
59c2be1e
YS
789 */
790enum {
b5584180 791 Opt_queue_depth,
59c2be1e
YS
792 Opt_last_int,
793 /* int args above */
794 Opt_last_string,
795 /* string args above */
cc0538b6
AE
796 Opt_read_only,
797 Opt_read_write,
80de1912 798 Opt_lock_on_read,
210c104c 799 Opt_err
59c2be1e
YS
800};
801
43ae4701 802static match_table_t rbd_opts_tokens = {
b5584180 803 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
804 /* int args above */
805 /* string args above */
be466c1c 806 {Opt_read_only, "read_only"},
cc0538b6
AE
807 {Opt_read_only, "ro"}, /* Alternate spelling */
808 {Opt_read_write, "read_write"},
809 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 810 {Opt_lock_on_read, "lock_on_read"},
210c104c 811 {Opt_err, NULL}
59c2be1e
YS
812};
813
98571b5a 814struct rbd_options {
b5584180 815 int queue_depth;
98571b5a 816 bool read_only;
80de1912 817 bool lock_on_read;
98571b5a
AE
818};
819
b5584180 820#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 821#define RBD_READ_ONLY_DEFAULT false
80de1912 822#define RBD_LOCK_ON_READ_DEFAULT false
98571b5a 823
59c2be1e
YS
824static int parse_rbd_opts_token(char *c, void *private)
825{
43ae4701 826 struct rbd_options *rbd_opts = private;
59c2be1e
YS
827 substring_t argstr[MAX_OPT_ARGS];
828 int token, intval, ret;
829
43ae4701 830 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
831 if (token < Opt_last_int) {
832 ret = match_int(&argstr[0], &intval);
833 if (ret < 0) {
210c104c 834 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
835 return ret;
836 }
837 dout("got int token %d val %d\n", token, intval);
838 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 839 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
840 } else {
841 dout("got token %d\n", token);
842 }
843
844 switch (token) {
b5584180
ID
845 case Opt_queue_depth:
846 if (intval < 1) {
847 pr_err("queue_depth out of range\n");
848 return -EINVAL;
849 }
850 rbd_opts->queue_depth = intval;
851 break;
cc0538b6
AE
852 case Opt_read_only:
853 rbd_opts->read_only = true;
854 break;
855 case Opt_read_write:
856 rbd_opts->read_only = false;
857 break;
80de1912
ID
858 case Opt_lock_on_read:
859 rbd_opts->lock_on_read = true;
860 break;
59c2be1e 861 default:
210c104c
ID
862 /* libceph prints "bad option" msg */
863 return -EINVAL;
59c2be1e 864 }
210c104c 865
59c2be1e
YS
866 return 0;
867}
868
6d2940c8
GZ
869static char* obj_op_name(enum obj_operation_type op_type)
870{
871 switch (op_type) {
872 case OBJ_OP_READ:
873 return "read";
874 case OBJ_OP_WRITE:
875 return "write";
90e98c52
GZ
876 case OBJ_OP_DISCARD:
877 return "discard";
6d2940c8
GZ
878 default:
879 return "???";
880 }
881}
882
602adf40
YS
883/*
884 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
885 * not exist create it. Either way, ceph_opts is consumed by this
886 * function.
602adf40 887 */
9d3997fd 888static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 889{
f8c38929 890 struct rbd_client *rbdc;
59c2be1e 891
cfbf6377 892 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 893 rbdc = rbd_client_find(ceph_opts);
9d3997fd 894 if (rbdc) /* using an existing client */
43ae4701 895 ceph_destroy_options(ceph_opts);
9d3997fd 896 else
f8c38929 897 rbdc = rbd_client_create(ceph_opts);
cfbf6377 898 mutex_unlock(&client_mutex);
602adf40 899
9d3997fd 900 return rbdc;
602adf40
YS
901}
902
903/*
904 * Destroy ceph client
d23a4b3f 905 *
432b8587 906 * Caller must hold rbd_client_list_lock.
602adf40
YS
907 */
908static void rbd_client_release(struct kref *kref)
909{
910 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
911
37206ee5 912 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 913 spin_lock(&rbd_client_list_lock);
602adf40 914 list_del(&rbdc->node);
cd9d9f5d 915 spin_unlock(&rbd_client_list_lock);
602adf40
YS
916
917 ceph_destroy_client(rbdc->client);
918 kfree(rbdc);
919}
920
921/*
922 * Drop reference to ceph client node. If it's not referenced anymore, release
923 * it.
924 */
9d3997fd 925static void rbd_put_client(struct rbd_client *rbdc)
602adf40 926{
c53d5893
AE
927 if (rbdc)
928 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
929}
930
a30b71b9
AE
931static bool rbd_image_format_valid(u32 image_format)
932{
933 return image_format == 1 || image_format == 2;
934}
935
8e94af8e
AE
936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
937{
103a150f
AE
938 size_t size;
939 u32 snap_count;
940
941 /* The header has to start with the magic rbd header text */
942 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
943 return false;
944
db2388b6
AE
945 /* The bio layer requires at least sector-sized I/O */
946
947 if (ondisk->options.order < SECTOR_SHIFT)
948 return false;
949
950 /* If we use u64 in a few spots we may be able to loosen this */
951
952 if (ondisk->options.order > 8 * sizeof (int) - 1)
953 return false;
954
103a150f
AE
955 /*
956 * The size of a snapshot header has to fit in a size_t, and
957 * that limits the number of snapshots.
958 */
959 snap_count = le32_to_cpu(ondisk->snap_count);
960 size = SIZE_MAX - sizeof (struct ceph_snap_context);
961 if (snap_count > size / sizeof (__le64))
962 return false;
963
964 /*
965 * Not only that, but the size of the entire the snapshot
966 * header must also be representable in a size_t.
967 */
968 size -= snap_count * sizeof (__le64);
969 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
970 return false;
971
972 return true;
8e94af8e
AE
973}
974
602adf40 975/*
bb23e37a
AE
976 * Fill an rbd image header with information from the given format 1
977 * on-disk header.
602adf40 978 */
662518b1 979static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 980 struct rbd_image_header_ondisk *ondisk)
602adf40 981{
662518b1 982 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
983 bool first_time = header->object_prefix == NULL;
984 struct ceph_snap_context *snapc;
985 char *object_prefix = NULL;
986 char *snap_names = NULL;
987 u64 *snap_sizes = NULL;
ccece235 988 u32 snap_count;
d2bb24e5 989 size_t size;
bb23e37a 990 int ret = -ENOMEM;
621901d6 991 u32 i;
602adf40 992
bb23e37a 993 /* Allocate this now to avoid having to handle failure below */
6a52325f 994
bb23e37a
AE
995 if (first_time) {
996 size_t len;
103a150f 997
bb23e37a
AE
998 len = strnlen(ondisk->object_prefix,
999 sizeof (ondisk->object_prefix));
1000 object_prefix = kmalloc(len + 1, GFP_KERNEL);
1001 if (!object_prefix)
1002 return -ENOMEM;
1003 memcpy(object_prefix, ondisk->object_prefix, len);
1004 object_prefix[len] = '\0';
1005 }
00f1f36f 1006
bb23e37a 1007 /* Allocate the snapshot context and fill it in */
00f1f36f 1008
bb23e37a
AE
1009 snap_count = le32_to_cpu(ondisk->snap_count);
1010 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1011 if (!snapc)
1012 goto out_err;
1013 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1014 if (snap_count) {
bb23e37a 1015 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1016 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1017
bb23e37a 1018 /* We'll keep a copy of the snapshot names... */
621901d6 1019
bb23e37a
AE
1020 if (snap_names_len > (u64)SIZE_MAX)
1021 goto out_2big;
1022 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1023 if (!snap_names)
6a52325f
AE
1024 goto out_err;
1025
bb23e37a 1026 /* ...as well as the array of their sizes. */
621901d6 1027
d2bb24e5 1028 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
1029 snap_sizes = kmalloc(size, GFP_KERNEL);
1030 if (!snap_sizes)
6a52325f 1031 goto out_err;
bb23e37a 1032
f785cc1d 1033 /*
bb23e37a
AE
1034 * Copy the names, and fill in each snapshot's id
1035 * and size.
1036 *
99a41ebc 1037 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1038 * ondisk buffer we're working with has
f785cc1d
AE
1039 * snap_names_len bytes beyond the end of the
1040 * snapshot id array, this memcpy() is safe.
1041 */
bb23e37a
AE
1042 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1043 snaps = ondisk->snaps;
1044 for (i = 0; i < snap_count; i++) {
1045 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1046 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1047 }
602adf40 1048 }
6a52325f 1049
bb23e37a 1050 /* We won't fail any more, fill in the header */
621901d6 1051
bb23e37a
AE
1052 if (first_time) {
1053 header->object_prefix = object_prefix;
1054 header->obj_order = ondisk->options.order;
1055 header->crypt_type = ondisk->options.crypt_type;
1056 header->comp_type = ondisk->options.comp_type;
1057 /* The rest aren't used for format 1 images */
1058 header->stripe_unit = 0;
1059 header->stripe_count = 0;
1060 header->features = 0;
602adf40 1061 } else {
662518b1
AE
1062 ceph_put_snap_context(header->snapc);
1063 kfree(header->snap_names);
1064 kfree(header->snap_sizes);
602adf40 1065 }
849b4260 1066
bb23e37a 1067 /* The remaining fields always get updated (when we refresh) */
621901d6 1068
f84344f3 1069 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1070 header->snapc = snapc;
1071 header->snap_names = snap_names;
1072 header->snap_sizes = snap_sizes;
468521c1 1073
602adf40 1074 return 0;
bb23e37a
AE
1075out_2big:
1076 ret = -EIO;
6a52325f 1077out_err:
bb23e37a
AE
1078 kfree(snap_sizes);
1079 kfree(snap_names);
1080 ceph_put_snap_context(snapc);
1081 kfree(object_prefix);
ccece235 1082
bb23e37a 1083 return ret;
602adf40
YS
1084}
1085
9682fc6d
AE
1086static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1087{
1088 const char *snap_name;
1089
1090 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1091
1092 /* Skip over names until we find the one we are looking for */
1093
1094 snap_name = rbd_dev->header.snap_names;
1095 while (which--)
1096 snap_name += strlen(snap_name) + 1;
1097
1098 return kstrdup(snap_name, GFP_KERNEL);
1099}
1100
30d1cff8
AE
1101/*
1102 * Snapshot id comparison function for use with qsort()/bsearch().
1103 * Note that result is for snapshots in *descending* order.
1104 */
1105static int snapid_compare_reverse(const void *s1, const void *s2)
1106{
1107 u64 snap_id1 = *(u64 *)s1;
1108 u64 snap_id2 = *(u64 *)s2;
1109
1110 if (snap_id1 < snap_id2)
1111 return 1;
1112 return snap_id1 == snap_id2 ? 0 : -1;
1113}
1114
1115/*
1116 * Search a snapshot context to see if the given snapshot id is
1117 * present.
1118 *
1119 * Returns the position of the snapshot id in the array if it's found,
1120 * or BAD_SNAP_INDEX otherwise.
1121 *
1122 * Note: The snapshot array is in kept sorted (by the osd) in
1123 * reverse order, highest snapshot id first.
1124 */
9682fc6d
AE
1125static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1126{
1127 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1128 u64 *found;
9682fc6d 1129
30d1cff8
AE
1130 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1131 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1132
30d1cff8 1133 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1134}
1135
2ad3d716
AE
1136static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1137 u64 snap_id)
9e15b77d 1138{
54cac61f 1139 u32 which;
da6a6b63 1140 const char *snap_name;
9e15b77d 1141
54cac61f
AE
1142 which = rbd_dev_snap_index(rbd_dev, snap_id);
1143 if (which == BAD_SNAP_INDEX)
da6a6b63 1144 return ERR_PTR(-ENOENT);
54cac61f 1145
da6a6b63
JD
1146 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1147 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1148}
1149
1150static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1151{
9e15b77d
AE
1152 if (snap_id == CEPH_NOSNAP)
1153 return RBD_SNAP_HEAD_NAME;
1154
54cac61f
AE
1155 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1156 if (rbd_dev->image_format == 1)
1157 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1158
54cac61f 1159 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1160}
1161
2ad3d716
AE
1162static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1163 u64 *snap_size)
602adf40 1164{
2ad3d716
AE
1165 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1166 if (snap_id == CEPH_NOSNAP) {
1167 *snap_size = rbd_dev->header.image_size;
1168 } else if (rbd_dev->image_format == 1) {
1169 u32 which;
602adf40 1170
2ad3d716
AE
1171 which = rbd_dev_snap_index(rbd_dev, snap_id);
1172 if (which == BAD_SNAP_INDEX)
1173 return -ENOENT;
e86924a8 1174
2ad3d716
AE
1175 *snap_size = rbd_dev->header.snap_sizes[which];
1176 } else {
1177 u64 size = 0;
1178 int ret;
1179
1180 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1181 if (ret)
1182 return ret;
1183
1184 *snap_size = size;
1185 }
1186 return 0;
602adf40
YS
1187}
1188
2ad3d716
AE
1189static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1190 u64 *snap_features)
602adf40 1191{
2ad3d716
AE
1192 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1193 if (snap_id == CEPH_NOSNAP) {
1194 *snap_features = rbd_dev->header.features;
1195 } else if (rbd_dev->image_format == 1) {
1196 *snap_features = 0; /* No features for format 1 */
602adf40 1197 } else {
2ad3d716
AE
1198 u64 features = 0;
1199 int ret;
8b0241f8 1200
2ad3d716
AE
1201 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1202 if (ret)
1203 return ret;
1204
1205 *snap_features = features;
1206 }
1207 return 0;
1208}
1209
1210static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1211{
8f4b7d98 1212 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1213 u64 size = 0;
1214 u64 features = 0;
1215 int ret;
1216
2ad3d716
AE
1217 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1218 if (ret)
1219 return ret;
1220 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1221 if (ret)
1222 return ret;
1223
1224 rbd_dev->mapping.size = size;
1225 rbd_dev->mapping.features = features;
1226
8b0241f8 1227 return 0;
602adf40
YS
1228}
1229
d1cf5788
AE
1230static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1231{
1232 rbd_dev->mapping.size = 0;
1233 rbd_dev->mapping.features = 0;
200a6a8b
AE
1234}
1235
7d5079aa
HS
1236static void rbd_segment_name_free(const char *name)
1237{
1238 /* The explicit cast here is needed to drop the const qualifier */
1239
1240 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1241}
1242
98571b5a 1243static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1244{
65ccfe21
AE
1245 char *name;
1246 u64 segment;
1247 int ret;
3a96d5cd 1248 char *name_format;
602adf40 1249
78c2a44a 1250 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1251 if (!name)
1252 return NULL;
1253 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1254 name_format = "%s.%012llx";
1255 if (rbd_dev->image_format == 2)
1256 name_format = "%s.%016llx";
2d0ebc5d 1257 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1258 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1259 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1260 pr_err("error formatting segment name for #%llu (%d)\n",
1261 segment, ret);
7d5079aa 1262 rbd_segment_name_free(name);
65ccfe21
AE
1263 name = NULL;
1264 }
602adf40 1265
65ccfe21
AE
1266 return name;
1267}
602adf40 1268
65ccfe21
AE
1269static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1270{
1271 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1272
65ccfe21
AE
1273 return offset & (segment_size - 1);
1274}
1275
1276static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1277 u64 offset, u64 length)
1278{
1279 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1280
1281 offset &= segment_size - 1;
1282
aafb230e 1283 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1284 if (offset + length > segment_size)
1285 length = segment_size - offset;
1286
1287 return length;
602adf40
YS
1288}
1289
029bcbd8
JD
1290/*
1291 * returns the size of an object in the image
1292 */
1293static u64 rbd_obj_bytes(struct rbd_image_header *header)
1294{
1295 return 1 << header->obj_order;
1296}
1297
602adf40
YS
1298/*
1299 * bio helpers
1300 */
1301
1302static void bio_chain_put(struct bio *chain)
1303{
1304 struct bio *tmp;
1305
1306 while (chain) {
1307 tmp = chain;
1308 chain = chain->bi_next;
1309 bio_put(tmp);
1310 }
1311}
1312
1313/*
1314 * zeros a bio chain, starting at specific offset
1315 */
1316static void zero_bio_chain(struct bio *chain, int start_ofs)
1317{
7988613b
KO
1318 struct bio_vec bv;
1319 struct bvec_iter iter;
602adf40
YS
1320 unsigned long flags;
1321 void *buf;
602adf40
YS
1322 int pos = 0;
1323
1324 while (chain) {
7988613b
KO
1325 bio_for_each_segment(bv, chain, iter) {
1326 if (pos + bv.bv_len > start_ofs) {
602adf40 1327 int remainder = max(start_ofs - pos, 0);
7988613b 1328 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1329 memset(buf + remainder, 0,
7988613b
KO
1330 bv.bv_len - remainder);
1331 flush_dcache_page(bv.bv_page);
85b5aaa6 1332 bvec_kunmap_irq(buf, &flags);
602adf40 1333 }
7988613b 1334 pos += bv.bv_len;
602adf40
YS
1335 }
1336
1337 chain = chain->bi_next;
1338 }
1339}
1340
b9434c5b
AE
1341/*
1342 * similar to zero_bio_chain(), zeros data defined by a page array,
1343 * starting at the given byte offset from the start of the array and
1344 * continuing up to the given end offset. The pages array is
1345 * assumed to be big enough to hold all bytes up to the end.
1346 */
1347static void zero_pages(struct page **pages, u64 offset, u64 end)
1348{
1349 struct page **page = &pages[offset >> PAGE_SHIFT];
1350
1351 rbd_assert(end > offset);
1352 rbd_assert(end - offset <= (u64)SIZE_MAX);
1353 while (offset < end) {
1354 size_t page_offset;
1355 size_t length;
1356 unsigned long flags;
1357 void *kaddr;
1358
491205a8
GU
1359 page_offset = offset & ~PAGE_MASK;
1360 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1361 local_irq_save(flags);
1362 kaddr = kmap_atomic(*page);
1363 memset(kaddr + page_offset, 0, length);
e2156054 1364 flush_dcache_page(*page);
b9434c5b
AE
1365 kunmap_atomic(kaddr);
1366 local_irq_restore(flags);
1367
1368 offset += length;
1369 page++;
1370 }
1371}
1372
602adf40 1373/*
f7760dad
AE
1374 * Clone a portion of a bio, starting at the given byte offset
1375 * and continuing for the number of bytes indicated.
602adf40 1376 */
f7760dad
AE
1377static struct bio *bio_clone_range(struct bio *bio_src,
1378 unsigned int offset,
1379 unsigned int len,
1380 gfp_t gfpmask)
602adf40 1381{
f7760dad
AE
1382 struct bio *bio;
1383
5341a627 1384 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1385 if (!bio)
1386 return NULL; /* ENOMEM */
602adf40 1387
5341a627 1388 bio_advance(bio, offset);
4f024f37 1389 bio->bi_iter.bi_size = len;
f7760dad
AE
1390
1391 return bio;
1392}
1393
1394/*
1395 * Clone a portion of a bio chain, starting at the given byte offset
1396 * into the first bio in the source chain and continuing for the
1397 * number of bytes indicated. The result is another bio chain of
1398 * exactly the given length, or a null pointer on error.
1399 *
1400 * The bio_src and offset parameters are both in-out. On entry they
1401 * refer to the first source bio and the offset into that bio where
1402 * the start of data to be cloned is located.
1403 *
1404 * On return, bio_src is updated to refer to the bio in the source
1405 * chain that contains first un-cloned byte, and *offset will
1406 * contain the offset of that byte within that bio.
1407 */
1408static struct bio *bio_chain_clone_range(struct bio **bio_src,
1409 unsigned int *offset,
1410 unsigned int len,
1411 gfp_t gfpmask)
1412{
1413 struct bio *bi = *bio_src;
1414 unsigned int off = *offset;
1415 struct bio *chain = NULL;
1416 struct bio **end;
1417
1418 /* Build up a chain of clone bios up to the limit */
1419
4f024f37 1420 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1421 return NULL; /* Nothing to clone */
602adf40 1422
f7760dad
AE
1423 end = &chain;
1424 while (len) {
1425 unsigned int bi_size;
1426 struct bio *bio;
1427
f5400b7a
AE
1428 if (!bi) {
1429 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1430 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1431 }
4f024f37 1432 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1433 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1434 if (!bio)
1435 goto out_err; /* ENOMEM */
1436
1437 *end = bio;
1438 end = &bio->bi_next;
602adf40 1439
f7760dad 1440 off += bi_size;
4f024f37 1441 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1442 bi = bi->bi_next;
1443 off = 0;
1444 }
1445 len -= bi_size;
1446 }
1447 *bio_src = bi;
1448 *offset = off;
1449
1450 return chain;
1451out_err:
1452 bio_chain_put(chain);
602adf40 1453
602adf40
YS
1454 return NULL;
1455}
1456
926f9b3f
AE
1457/*
1458 * The default/initial value for all object request flags is 0. For
1459 * each flag, once its value is set to 1 it is never reset to 0
1460 * again.
1461 */
57acbaa7 1462static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1463{
57acbaa7 1464 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1465 struct rbd_device *rbd_dev;
1466
57acbaa7 1467 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1468 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1469 obj_request);
1470 }
1471}
1472
57acbaa7 1473static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1474{
1475 smp_mb();
57acbaa7 1476 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1477}
1478
57acbaa7 1479static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1480{
57acbaa7
AE
1481 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1482 struct rbd_device *rbd_dev = NULL;
6365d33a 1483
57acbaa7
AE
1484 if (obj_request_img_data_test(obj_request))
1485 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1486 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1487 obj_request);
1488 }
1489}
1490
57acbaa7 1491static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1492{
1493 smp_mb();
57acbaa7 1494 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1495}
1496
5679c59f
AE
1497/*
1498 * This sets the KNOWN flag after (possibly) setting the EXISTS
1499 * flag. The latter is set based on the "exists" value provided.
1500 *
1501 * Note that for our purposes once an object exists it never goes
1502 * away again. It's possible that the response from two existence
1503 * checks are separated by the creation of the target object, and
1504 * the first ("doesn't exist") response arrives *after* the second
1505 * ("does exist"). In that case we ignore the second one.
1506 */
1507static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1508 bool exists)
1509{
1510 if (exists)
1511 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1512 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1513 smp_mb();
1514}
1515
1516static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1517{
1518 smp_mb();
1519 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1520}
1521
1522static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1523{
1524 smp_mb();
1525 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1526}
1527
9638556a
ID
1528static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1529{
1530 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1531
1532 return obj_request->img_offset <
1533 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1534}
1535
bf0d5f50
AE
1536static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1537{
37206ee5
AE
1538 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1539 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1540 kref_get(&obj_request->kref);
1541}
1542
1543static void rbd_obj_request_destroy(struct kref *kref);
1544static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1545{
1546 rbd_assert(obj_request != NULL);
37206ee5
AE
1547 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1548 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1549 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1550}
1551
0f2d5be7
AE
1552static void rbd_img_request_get(struct rbd_img_request *img_request)
1553{
1554 dout("%s: img %p (was %d)\n", __func__, img_request,
1555 atomic_read(&img_request->kref.refcount));
1556 kref_get(&img_request->kref);
1557}
1558
e93f3152
AE
1559static bool img_request_child_test(struct rbd_img_request *img_request);
1560static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1561static void rbd_img_request_destroy(struct kref *kref);
1562static void rbd_img_request_put(struct rbd_img_request *img_request)
1563{
1564 rbd_assert(img_request != NULL);
37206ee5
AE
1565 dout("%s: img %p (was %d)\n", __func__, img_request,
1566 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1567 if (img_request_child_test(img_request))
1568 kref_put(&img_request->kref, rbd_parent_request_destroy);
1569 else
1570 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1571}
1572
1573static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1574 struct rbd_obj_request *obj_request)
1575{
25dcf954
AE
1576 rbd_assert(obj_request->img_request == NULL);
1577
b155e86c 1578 /* Image request now owns object's original reference */
bf0d5f50 1579 obj_request->img_request = img_request;
25dcf954 1580 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1581 rbd_assert(!obj_request_img_data_test(obj_request));
1582 obj_request_img_data_set(obj_request);
bf0d5f50 1583 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1584 img_request->obj_request_count++;
1585 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1586 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1587 obj_request->which);
bf0d5f50
AE
1588}
1589
1590static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1591 struct rbd_obj_request *obj_request)
1592{
1593 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1594
37206ee5
AE
1595 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1596 obj_request->which);
bf0d5f50 1597 list_del(&obj_request->links);
25dcf954
AE
1598 rbd_assert(img_request->obj_request_count > 0);
1599 img_request->obj_request_count--;
1600 rbd_assert(obj_request->which == img_request->obj_request_count);
1601 obj_request->which = BAD_WHICH;
6365d33a 1602 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1603 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1604 obj_request->img_request = NULL;
25dcf954 1605 obj_request->callback = NULL;
bf0d5f50
AE
1606 rbd_obj_request_put(obj_request);
1607}
1608
1609static bool obj_request_type_valid(enum obj_request_type type)
1610{
1611 switch (type) {
9969ebc5 1612 case OBJ_REQUEST_NODATA:
bf0d5f50 1613 case OBJ_REQUEST_BIO:
788e2df3 1614 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1615 return true;
1616 default:
1617 return false;
1618 }
1619}
1620
4a17dadc
ID
1621static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1622
980917fc 1623static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1624{
980917fc
ID
1625 struct ceph_osd_request *osd_req = obj_request->osd_req;
1626
1627 dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
4a17dadc
ID
1628 if (obj_request_img_data_test(obj_request)) {
1629 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1630 rbd_img_request_get(obj_request->img_request);
1631 }
980917fc 1632 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1633}
1634
71c20a06
ID
1635static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1636{
1637 dout("%s %p\n", __func__, obj_request);
1638 ceph_osdc_cancel_request(obj_request->osd_req);
1639}
1640
1641/*
1642 * Wait for an object request to complete. If interrupted, cancel the
1643 * underlying osd request.
2894e1d7
ID
1644 *
1645 * @timeout: in jiffies, 0 means "wait forever"
71c20a06 1646 */
2894e1d7
ID
1647static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1648 unsigned long timeout)
71c20a06 1649{
2894e1d7 1650 long ret;
71c20a06
ID
1651
1652 dout("%s %p\n", __func__, obj_request);
2894e1d7
ID
1653 ret = wait_for_completion_interruptible_timeout(
1654 &obj_request->completion,
1655 ceph_timeout_jiffies(timeout));
1656 if (ret <= 0) {
1657 if (ret == 0)
1658 ret = -ETIMEDOUT;
71c20a06 1659 rbd_obj_request_end(obj_request);
2894e1d7
ID
1660 } else {
1661 ret = 0;
71c20a06
ID
1662 }
1663
2894e1d7
ID
1664 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1665 return ret;
1666}
1667
1668static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1669{
1670 return __rbd_obj_request_wait(obj_request, 0);
1671}
1672
bf0d5f50
AE
1673static void rbd_img_request_complete(struct rbd_img_request *img_request)
1674{
55f27e09 1675
37206ee5 1676 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1677
1678 /*
1679 * If no error occurred, compute the aggregate transfer
1680 * count for the image request. We could instead use
1681 * atomic64_cmpxchg() to update it as each object request
1682 * completes; not clear which way is better off hand.
1683 */
1684 if (!img_request->result) {
1685 struct rbd_obj_request *obj_request;
1686 u64 xferred = 0;
1687
1688 for_each_obj_request(img_request, obj_request)
1689 xferred += obj_request->xferred;
1690 img_request->xferred = xferred;
1691 }
1692
bf0d5f50
AE
1693 if (img_request->callback)
1694 img_request->callback(img_request);
1695 else
1696 rbd_img_request_put(img_request);
1697}
1698
0c425248
AE
1699/*
1700 * The default/initial value for all image request flags is 0. Each
1701 * is conditionally set to 1 at image request initialization time
1702 * and currently never change thereafter.
1703 */
1704static void img_request_write_set(struct rbd_img_request *img_request)
1705{
1706 set_bit(IMG_REQ_WRITE, &img_request->flags);
1707 smp_mb();
1708}
1709
1710static bool img_request_write_test(struct rbd_img_request *img_request)
1711{
1712 smp_mb();
1713 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1714}
1715
90e98c52
GZ
1716/*
1717 * Set the discard flag when the img_request is an discard request
1718 */
1719static void img_request_discard_set(struct rbd_img_request *img_request)
1720{
1721 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1722 smp_mb();
1723}
1724
1725static bool img_request_discard_test(struct rbd_img_request *img_request)
1726{
1727 smp_mb();
1728 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1729}
1730
9849e986
AE
1731static void img_request_child_set(struct rbd_img_request *img_request)
1732{
1733 set_bit(IMG_REQ_CHILD, &img_request->flags);
1734 smp_mb();
1735}
1736
e93f3152
AE
1737static void img_request_child_clear(struct rbd_img_request *img_request)
1738{
1739 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1740 smp_mb();
1741}
1742
9849e986
AE
1743static bool img_request_child_test(struct rbd_img_request *img_request)
1744{
1745 smp_mb();
1746 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1747}
1748
d0b2e944
AE
1749static void img_request_layered_set(struct rbd_img_request *img_request)
1750{
1751 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1752 smp_mb();
1753}
1754
a2acd00e
AE
1755static void img_request_layered_clear(struct rbd_img_request *img_request)
1756{
1757 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1758 smp_mb();
1759}
1760
d0b2e944
AE
1761static bool img_request_layered_test(struct rbd_img_request *img_request)
1762{
1763 smp_mb();
1764 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1765}
1766
3b434a2a
JD
1767static enum obj_operation_type
1768rbd_img_request_op_type(struct rbd_img_request *img_request)
1769{
1770 if (img_request_write_test(img_request))
1771 return OBJ_OP_WRITE;
1772 else if (img_request_discard_test(img_request))
1773 return OBJ_OP_DISCARD;
1774 else
1775 return OBJ_OP_READ;
1776}
1777
6e2a4505
AE
1778static void
1779rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1780{
b9434c5b
AE
1781 u64 xferred = obj_request->xferred;
1782 u64 length = obj_request->length;
1783
6e2a4505
AE
1784 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1785 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1786 xferred, length);
6e2a4505 1787 /*
17c1cc1d
JD
1788 * ENOENT means a hole in the image. We zero-fill the entire
1789 * length of the request. A short read also implies zero-fill
1790 * to the end of the request. An error requires the whole
1791 * length of the request to be reported finished with an error
1792 * to the block layer. In each case we update the xferred
1793 * count to indicate the whole request was satisfied.
6e2a4505 1794 */
b9434c5b 1795 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1796 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1797 if (obj_request->type == OBJ_REQUEST_BIO)
1798 zero_bio_chain(obj_request->bio_list, 0);
1799 else
1800 zero_pages(obj_request->pages, 0, length);
6e2a4505 1801 obj_request->result = 0;
b9434c5b
AE
1802 } else if (xferred < length && !obj_request->result) {
1803 if (obj_request->type == OBJ_REQUEST_BIO)
1804 zero_bio_chain(obj_request->bio_list, xferred);
1805 else
1806 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1807 }
17c1cc1d 1808 obj_request->xferred = length;
6e2a4505
AE
1809 obj_request_done_set(obj_request);
1810}
1811
bf0d5f50
AE
1812static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1813{
37206ee5
AE
1814 dout("%s: obj %p cb %p\n", __func__, obj_request,
1815 obj_request->callback);
bf0d5f50
AE
1816 if (obj_request->callback)
1817 obj_request->callback(obj_request);
788e2df3
AE
1818 else
1819 complete_all(&obj_request->completion);
bf0d5f50
AE
1820}
1821
c47f9371 1822static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1823{
57acbaa7 1824 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1825 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1826 bool layered = false;
1827
1828 if (obj_request_img_data_test(obj_request)) {
1829 img_request = obj_request->img_request;
1830 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1831 rbd_dev = img_request->rbd_dev;
57acbaa7 1832 }
8b3e1a56
AE
1833
1834 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1835 obj_request, img_request, obj_request->result,
1836 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1837 if (layered && obj_request->result == -ENOENT &&
1838 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1839 rbd_img_parent_read(obj_request);
1840 else if (img_request)
6e2a4505
AE
1841 rbd_img_obj_request_read_callback(obj_request);
1842 else
1843 obj_request_done_set(obj_request);
bf0d5f50
AE
1844}
1845
c47f9371 1846static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1847{
1b83bef2
SW
1848 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1849 obj_request->result, obj_request->length);
1850 /*
8b3e1a56
AE
1851 * There is no such thing as a successful short write. Set
1852 * it to our originally-requested length.
1b83bef2
SW
1853 */
1854 obj_request->xferred = obj_request->length;
07741308 1855 obj_request_done_set(obj_request);
bf0d5f50
AE
1856}
1857
90e98c52
GZ
1858static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1859{
1860 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1861 obj_request->result, obj_request->length);
1862 /*
1863 * There is no such thing as a successful short discard. Set
1864 * it to our originally-requested length.
1865 */
1866 obj_request->xferred = obj_request->length;
d0265de7
JD
1867 /* discarding a non-existent object is not a problem */
1868 if (obj_request->result == -ENOENT)
1869 obj_request->result = 0;
90e98c52
GZ
1870 obj_request_done_set(obj_request);
1871}
1872
fbfab539
AE
1873/*
1874 * For a simple stat call there's nothing to do. We'll do more if
1875 * this is part of a write sequence for a layered image.
1876 */
c47f9371 1877static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1878{
37206ee5 1879 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1880 obj_request_done_set(obj_request);
1881}
1882
2761713d
ID
1883static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1884{
1885 dout("%s: obj %p\n", __func__, obj_request);
1886
1887 if (obj_request_img_data_test(obj_request))
1888 rbd_osd_copyup_callback(obj_request);
1889 else
1890 obj_request_done_set(obj_request);
1891}
1892
85e084fe 1893static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1894{
1895 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1896 u16 opcode;
1897
85e084fe 1898 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1899 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1900 if (obj_request_img_data_test(obj_request)) {
1901 rbd_assert(obj_request->img_request);
1902 rbd_assert(obj_request->which != BAD_WHICH);
1903 } else {
1904 rbd_assert(obj_request->which == BAD_WHICH);
1905 }
bf0d5f50 1906
1b83bef2
SW
1907 if (osd_req->r_result < 0)
1908 obj_request->result = osd_req->r_result;
bf0d5f50 1909
c47f9371
AE
1910 /*
1911 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1912 * passed to the block layer, which just supports a 32-bit
1913 * length field.
c47f9371 1914 */
7665d85b 1915 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1916 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1917
79528734 1918 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1919 switch (opcode) {
1920 case CEPH_OSD_OP_READ:
c47f9371 1921 rbd_osd_read_callback(obj_request);
bf0d5f50 1922 break;
0ccd5926 1923 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1924 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1925 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1926 /* fall through */
bf0d5f50 1927 case CEPH_OSD_OP_WRITE:
e30b7577 1928 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1929 rbd_osd_write_callback(obj_request);
bf0d5f50 1930 break;
fbfab539 1931 case CEPH_OSD_OP_STAT:
c47f9371 1932 rbd_osd_stat_callback(obj_request);
fbfab539 1933 break;
90e98c52
GZ
1934 case CEPH_OSD_OP_DELETE:
1935 case CEPH_OSD_OP_TRUNCATE:
1936 case CEPH_OSD_OP_ZERO:
1937 rbd_osd_discard_callback(obj_request);
1938 break;
36be9a76 1939 case CEPH_OSD_OP_CALL:
2761713d
ID
1940 rbd_osd_call_callback(obj_request);
1941 break;
bf0d5f50 1942 default:
9584d508 1943 rbd_warn(NULL, "%s: unsupported op %hu",
bf0d5f50
AE
1944 obj_request->object_name, (unsigned short) opcode);
1945 break;
1946 }
1947
07741308 1948 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1949 rbd_obj_request_complete(obj_request);
1950}
1951
9d4df01f 1952static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1953{
8c042b0d 1954 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1955
7c84883a
ID
1956 rbd_assert(obj_request_img_data_test(obj_request));
1957 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1958}
1959
1960static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1961{
9d4df01f 1962 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1963
bb873b53
ID
1964 osd_req->r_mtime = CURRENT_TIME;
1965 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1966}
1967
0ccd5926
ID
1968/*
1969 * Create an osd request. A read request has one osd op (read).
1970 * A write request has either one (watch) or two (hint+write) osd ops.
1971 * (All rbd data writes are prefixed with an allocation hint op, but
1972 * technically osd watch is a write request, hence this distinction.)
1973 */
bf0d5f50
AE
1974static struct ceph_osd_request *rbd_osd_req_create(
1975 struct rbd_device *rbd_dev,
6d2940c8 1976 enum obj_operation_type op_type,
deb236b3 1977 unsigned int num_ops,
430c28c3 1978 struct rbd_obj_request *obj_request)
bf0d5f50 1979{
bf0d5f50
AE
1980 struct ceph_snap_context *snapc = NULL;
1981 struct ceph_osd_client *osdc;
1982 struct ceph_osd_request *osd_req;
bf0d5f50 1983
90e98c52
GZ
1984 if (obj_request_img_data_test(obj_request) &&
1985 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1986 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1987 if (op_type == OBJ_OP_WRITE) {
1988 rbd_assert(img_request_write_test(img_request));
1989 } else {
1990 rbd_assert(img_request_discard_test(img_request));
1991 }
6d2940c8 1992 snapc = img_request->snapc;
bf0d5f50
AE
1993 }
1994
6d2940c8 1995 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3
ID
1996
1997 /* Allocate and initialize the request, for the num_ops ops */
bf0d5f50
AE
1998
1999 osdc = &rbd_dev->rbd_client->client->osdc;
deb236b3 2000 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2224d879 2001 GFP_NOIO);
bf0d5f50 2002 if (!osd_req)
13d1ad16 2003 goto fail;
bf0d5f50 2004
90e98c52 2005 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
bf0d5f50 2006 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 2007 else
bf0d5f50 2008 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
2009
2010 osd_req->r_callback = rbd_osd_req_callback;
2011 osd_req->r_priv = obj_request;
2012
7627151e 2013 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2014 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2015 obj_request->object_name))
2016 goto fail;
bf0d5f50 2017
13d1ad16
ID
2018 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2019 goto fail;
2020
bf0d5f50 2021 return osd_req;
13d1ad16
ID
2022
2023fail:
2024 ceph_osdc_put_request(osd_req);
2025 return NULL;
bf0d5f50
AE
2026}
2027
0eefd470 2028/*
d3246fb0
JD
2029 * Create a copyup osd request based on the information in the object
2030 * request supplied. A copyup request has two or three osd ops, a
2031 * copyup method call, potentially a hint op, and a write or truncate
2032 * or zero op.
0eefd470
AE
2033 */
2034static struct ceph_osd_request *
2035rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2036{
2037 struct rbd_img_request *img_request;
2038 struct ceph_snap_context *snapc;
2039 struct rbd_device *rbd_dev;
2040 struct ceph_osd_client *osdc;
2041 struct ceph_osd_request *osd_req;
d3246fb0 2042 int num_osd_ops = 3;
0eefd470
AE
2043
2044 rbd_assert(obj_request_img_data_test(obj_request));
2045 img_request = obj_request->img_request;
2046 rbd_assert(img_request);
d3246fb0
JD
2047 rbd_assert(img_request_write_test(img_request) ||
2048 img_request_discard_test(img_request));
0eefd470 2049
d3246fb0
JD
2050 if (img_request_discard_test(img_request))
2051 num_osd_ops = 2;
2052
2053 /* Allocate and initialize the request, for all the ops */
0eefd470
AE
2054
2055 snapc = img_request->snapc;
2056 rbd_dev = img_request->rbd_dev;
2057 osdc = &rbd_dev->rbd_client->client->osdc;
d3246fb0 2058 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2224d879 2059 false, GFP_NOIO);
0eefd470 2060 if (!osd_req)
13d1ad16 2061 goto fail;
0eefd470
AE
2062
2063 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2064 osd_req->r_callback = rbd_osd_req_callback;
2065 osd_req->r_priv = obj_request;
2066
7627151e 2067 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2068 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2069 obj_request->object_name))
2070 goto fail;
0eefd470 2071
13d1ad16
ID
2072 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2073 goto fail;
2074
0eefd470 2075 return osd_req;
13d1ad16
ID
2076
2077fail:
2078 ceph_osdc_put_request(osd_req);
2079 return NULL;
0eefd470
AE
2080}
2081
2082
bf0d5f50
AE
2083static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2084{
2085 ceph_osdc_put_request(osd_req);
2086}
2087
2088/* object_name is assumed to be a non-null pointer and NUL-terminated */
2089
2090static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2091 u64 offset, u64 length,
2092 enum obj_request_type type)
2093{
2094 struct rbd_obj_request *obj_request;
2095 size_t size;
2096 char *name;
2097
2098 rbd_assert(obj_request_type_valid(type));
2099
2100 size = strlen(object_name) + 1;
5a60e876 2101 name = kmalloc(size, GFP_NOIO);
f907ad55 2102 if (!name)
bf0d5f50
AE
2103 return NULL;
2104
5a60e876 2105 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
2106 if (!obj_request) {
2107 kfree(name);
2108 return NULL;
2109 }
2110
bf0d5f50
AE
2111 obj_request->object_name = memcpy(name, object_name, size);
2112 obj_request->offset = offset;
2113 obj_request->length = length;
926f9b3f 2114 obj_request->flags = 0;
bf0d5f50
AE
2115 obj_request->which = BAD_WHICH;
2116 obj_request->type = type;
2117 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2118 init_completion(&obj_request->completion);
bf0d5f50
AE
2119 kref_init(&obj_request->kref);
2120
37206ee5
AE
2121 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2122 offset, length, (int)type, obj_request);
2123
bf0d5f50
AE
2124 return obj_request;
2125}
2126
2127static void rbd_obj_request_destroy(struct kref *kref)
2128{
2129 struct rbd_obj_request *obj_request;
2130
2131 obj_request = container_of(kref, struct rbd_obj_request, kref);
2132
37206ee5
AE
2133 dout("%s: obj %p\n", __func__, obj_request);
2134
bf0d5f50
AE
2135 rbd_assert(obj_request->img_request == NULL);
2136 rbd_assert(obj_request->which == BAD_WHICH);
2137
2138 if (obj_request->osd_req)
2139 rbd_osd_req_destroy(obj_request->osd_req);
2140
2141 rbd_assert(obj_request_type_valid(obj_request->type));
2142 switch (obj_request->type) {
9969ebc5
AE
2143 case OBJ_REQUEST_NODATA:
2144 break; /* Nothing to do */
bf0d5f50
AE
2145 case OBJ_REQUEST_BIO:
2146 if (obj_request->bio_list)
2147 bio_chain_put(obj_request->bio_list);
2148 break;
788e2df3
AE
2149 case OBJ_REQUEST_PAGES:
2150 if (obj_request->pages)
2151 ceph_release_page_vector(obj_request->pages,
2152 obj_request->page_count);
2153 break;
bf0d5f50
AE
2154 }
2155
f907ad55 2156 kfree(obj_request->object_name);
868311b1
AE
2157 obj_request->object_name = NULL;
2158 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2159}
2160
fb65d228
AE
2161/* It's OK to call this for a device with no parent */
2162
2163static void rbd_spec_put(struct rbd_spec *spec);
2164static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2165{
2166 rbd_dev_remove_parent(rbd_dev);
2167 rbd_spec_put(rbd_dev->parent_spec);
2168 rbd_dev->parent_spec = NULL;
2169 rbd_dev->parent_overlap = 0;
2170}
2171
a2acd00e
AE
2172/*
2173 * Parent image reference counting is used to determine when an
2174 * image's parent fields can be safely torn down--after there are no
2175 * more in-flight requests to the parent image. When the last
2176 * reference is dropped, cleaning them up is safe.
2177 */
2178static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2179{
2180 int counter;
2181
2182 if (!rbd_dev->parent_spec)
2183 return;
2184
2185 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2186 if (counter > 0)
2187 return;
2188
2189 /* Last reference; clean up parent data structures */
2190
2191 if (!counter)
2192 rbd_dev_unparent(rbd_dev);
2193 else
9584d508 2194 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2195}
2196
2197/*
2198 * If an image has a non-zero parent overlap, get a reference to its
2199 * parent.
2200 *
2201 * Returns true if the rbd device has a parent with a non-zero
2202 * overlap and a reference for it was successfully taken, or
2203 * false otherwise.
2204 */
2205static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2206{
ae43e9d0 2207 int counter = 0;
a2acd00e
AE
2208
2209 if (!rbd_dev->parent_spec)
2210 return false;
2211
ae43e9d0
ID
2212 down_read(&rbd_dev->header_rwsem);
2213 if (rbd_dev->parent_overlap)
2214 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2215 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2216
2217 if (counter < 0)
9584d508 2218 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2219
ae43e9d0 2220 return counter > 0;
a2acd00e
AE
2221}
2222
bf0d5f50
AE
2223/*
2224 * Caller is responsible for filling in the list of object requests
2225 * that comprises the image request, and the Linux request pointer
2226 * (if there is one).
2227 */
cc344fa1
AE
2228static struct rbd_img_request *rbd_img_request_create(
2229 struct rbd_device *rbd_dev,
bf0d5f50 2230 u64 offset, u64 length,
6d2940c8 2231 enum obj_operation_type op_type,
4e752f0a 2232 struct ceph_snap_context *snapc)
bf0d5f50
AE
2233{
2234 struct rbd_img_request *img_request;
bf0d5f50 2235
7a716aac 2236 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2237 if (!img_request)
2238 return NULL;
2239
bf0d5f50
AE
2240 img_request->rq = NULL;
2241 img_request->rbd_dev = rbd_dev;
2242 img_request->offset = offset;
2243 img_request->length = length;
0c425248 2244 img_request->flags = 0;
90e98c52
GZ
2245 if (op_type == OBJ_OP_DISCARD) {
2246 img_request_discard_set(img_request);
2247 img_request->snapc = snapc;
2248 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2249 img_request_write_set(img_request);
4e752f0a 2250 img_request->snapc = snapc;
0c425248 2251 } else {
bf0d5f50 2252 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2253 }
a2acd00e 2254 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2255 img_request_layered_set(img_request);
bf0d5f50
AE
2256 spin_lock_init(&img_request->completion_lock);
2257 img_request->next_completion = 0;
2258 img_request->callback = NULL;
a5a337d4 2259 img_request->result = 0;
bf0d5f50
AE
2260 img_request->obj_request_count = 0;
2261 INIT_LIST_HEAD(&img_request->obj_requests);
2262 kref_init(&img_request->kref);
2263
37206ee5 2264 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2265 obj_op_name(op_type), offset, length, img_request);
37206ee5 2266
bf0d5f50
AE
2267 return img_request;
2268}
2269
2270static void rbd_img_request_destroy(struct kref *kref)
2271{
2272 struct rbd_img_request *img_request;
2273 struct rbd_obj_request *obj_request;
2274 struct rbd_obj_request *next_obj_request;
2275
2276 img_request = container_of(kref, struct rbd_img_request, kref);
2277
37206ee5
AE
2278 dout("%s: img %p\n", __func__, img_request);
2279
bf0d5f50
AE
2280 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2281 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2282 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2283
a2acd00e
AE
2284 if (img_request_layered_test(img_request)) {
2285 img_request_layered_clear(img_request);
2286 rbd_dev_parent_put(img_request->rbd_dev);
2287 }
2288
bef95455
JD
2289 if (img_request_write_test(img_request) ||
2290 img_request_discard_test(img_request))
812164f8 2291 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2292
1c2a9dfe 2293 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2294}
2295
e93f3152
AE
2296static struct rbd_img_request *rbd_parent_request_create(
2297 struct rbd_obj_request *obj_request,
2298 u64 img_offset, u64 length)
2299{
2300 struct rbd_img_request *parent_request;
2301 struct rbd_device *rbd_dev;
2302
2303 rbd_assert(obj_request->img_request);
2304 rbd_dev = obj_request->img_request->rbd_dev;
2305
4e752f0a 2306 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2307 length, OBJ_OP_READ, NULL);
e93f3152
AE
2308 if (!parent_request)
2309 return NULL;
2310
2311 img_request_child_set(parent_request);
2312 rbd_obj_request_get(obj_request);
2313 parent_request->obj_request = obj_request;
2314
2315 return parent_request;
2316}
2317
2318static void rbd_parent_request_destroy(struct kref *kref)
2319{
2320 struct rbd_img_request *parent_request;
2321 struct rbd_obj_request *orig_request;
2322
2323 parent_request = container_of(kref, struct rbd_img_request, kref);
2324 orig_request = parent_request->obj_request;
2325
2326 parent_request->obj_request = NULL;
2327 rbd_obj_request_put(orig_request);
2328 img_request_child_clear(parent_request);
2329
2330 rbd_img_request_destroy(kref);
2331}
2332
1217857f
AE
2333static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2334{
6365d33a 2335 struct rbd_img_request *img_request;
1217857f
AE
2336 unsigned int xferred;
2337 int result;
8b3e1a56 2338 bool more;
1217857f 2339
6365d33a
AE
2340 rbd_assert(obj_request_img_data_test(obj_request));
2341 img_request = obj_request->img_request;
2342
1217857f
AE
2343 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2344 xferred = (unsigned int)obj_request->xferred;
2345 result = obj_request->result;
2346 if (result) {
2347 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2348 enum obj_operation_type op_type;
2349
90e98c52
GZ
2350 if (img_request_discard_test(img_request))
2351 op_type = OBJ_OP_DISCARD;
2352 else if (img_request_write_test(img_request))
2353 op_type = OBJ_OP_WRITE;
2354 else
2355 op_type = OBJ_OP_READ;
1217857f 2356
9584d508 2357 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2358 obj_op_name(op_type), obj_request->length,
2359 obj_request->img_offset, obj_request->offset);
9584d508 2360 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2361 result, xferred);
2362 if (!img_request->result)
2363 img_request->result = result;
082a75da
ID
2364 /*
2365 * Need to end I/O on the entire obj_request worth of
2366 * bytes in case of error.
2367 */
2368 xferred = obj_request->length;
1217857f
AE
2369 }
2370
f1a4739f
AE
2371 /* Image object requests don't own their page array */
2372
2373 if (obj_request->type == OBJ_REQUEST_PAGES) {
2374 obj_request->pages = NULL;
2375 obj_request->page_count = 0;
2376 }
2377
8b3e1a56
AE
2378 if (img_request_child_test(img_request)) {
2379 rbd_assert(img_request->obj_request != NULL);
2380 more = obj_request->which < img_request->obj_request_count - 1;
2381 } else {
2382 rbd_assert(img_request->rq != NULL);
7ad18afa
CH
2383
2384 more = blk_update_request(img_request->rq, result, xferred);
2385 if (!more)
2386 __blk_mq_end_request(img_request->rq, result);
8b3e1a56
AE
2387 }
2388
2389 return more;
1217857f
AE
2390}
2391
2169238d
AE
2392static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2393{
2394 struct rbd_img_request *img_request;
2395 u32 which = obj_request->which;
2396 bool more = true;
2397
6365d33a 2398 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2399 img_request = obj_request->img_request;
2400
2401 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2402 rbd_assert(img_request != NULL);
2169238d
AE
2403 rbd_assert(img_request->obj_request_count > 0);
2404 rbd_assert(which != BAD_WHICH);
2405 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2406
2407 spin_lock_irq(&img_request->completion_lock);
2408 if (which != img_request->next_completion)
2409 goto out;
2410
2411 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2412 rbd_assert(more);
2413 rbd_assert(which < img_request->obj_request_count);
2414
2415 if (!obj_request_done_test(obj_request))
2416 break;
1217857f 2417 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2418 which++;
2419 }
2420
2421 rbd_assert(more ^ (which == img_request->obj_request_count));
2422 img_request->next_completion = which;
2423out:
2424 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2425 rbd_img_request_put(img_request);
2169238d
AE
2426
2427 if (!more)
2428 rbd_img_request_complete(img_request);
2429}
2430
3b434a2a
JD
2431/*
2432 * Add individual osd ops to the given ceph_osd_request and prepare
2433 * them for submission. num_ops is the current number of
2434 * osd operations already to the object request.
2435 */
2436static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2437 struct ceph_osd_request *osd_request,
2438 enum obj_operation_type op_type,
2439 unsigned int num_ops)
2440{
2441 struct rbd_img_request *img_request = obj_request->img_request;
2442 struct rbd_device *rbd_dev = img_request->rbd_dev;
2443 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2444 u64 offset = obj_request->offset;
2445 u64 length = obj_request->length;
2446 u64 img_end;
2447 u16 opcode;
2448
2449 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2450 if (!offset && length == object_size &&
2451 (!img_request_layered_test(img_request) ||
2452 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2453 opcode = CEPH_OSD_OP_DELETE;
2454 } else if ((offset + length == object_size)) {
2455 opcode = CEPH_OSD_OP_TRUNCATE;
2456 } else {
2457 down_read(&rbd_dev->header_rwsem);
2458 img_end = rbd_dev->header.image_size;
2459 up_read(&rbd_dev->header_rwsem);
2460
2461 if (obj_request->img_offset + length == img_end)
2462 opcode = CEPH_OSD_OP_TRUNCATE;
2463 else
2464 opcode = CEPH_OSD_OP_ZERO;
2465 }
2466 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2467 if (!offset && length == object_size)
2468 opcode = CEPH_OSD_OP_WRITEFULL;
2469 else
2470 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2471 osd_req_op_alloc_hint_init(osd_request, num_ops,
2472 object_size, object_size);
2473 num_ops++;
2474 } else {
2475 opcode = CEPH_OSD_OP_READ;
2476 }
2477
7e868b6e 2478 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2479 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2480 else
2481 osd_req_op_extent_init(osd_request, num_ops, opcode,
2482 offset, length, 0, 0);
2483
3b434a2a
JD
2484 if (obj_request->type == OBJ_REQUEST_BIO)
2485 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2486 obj_request->bio_list, length);
2487 else if (obj_request->type == OBJ_REQUEST_PAGES)
2488 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2489 obj_request->pages, length,
2490 offset & ~PAGE_MASK, false, false);
2491
2492 /* Discards are also writes */
2493 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2494 rbd_osd_req_format_write(obj_request);
2495 else
2496 rbd_osd_req_format_read(obj_request);
2497}
2498
f1a4739f
AE
2499/*
2500 * Split up an image request into one or more object requests, each
2501 * to a different object. The "type" parameter indicates whether
2502 * "data_desc" is the pointer to the head of a list of bio
2503 * structures, or the base of a page array. In either case this
2504 * function assumes data_desc describes memory sufficient to hold
2505 * all data described by the image request.
2506 */
2507static int rbd_img_request_fill(struct rbd_img_request *img_request,
2508 enum obj_request_type type,
2509 void *data_desc)
bf0d5f50
AE
2510{
2511 struct rbd_device *rbd_dev = img_request->rbd_dev;
2512 struct rbd_obj_request *obj_request = NULL;
2513 struct rbd_obj_request *next_obj_request;
a158073c 2514 struct bio *bio_list = NULL;
f1a4739f 2515 unsigned int bio_offset = 0;
a158073c 2516 struct page **pages = NULL;
6d2940c8 2517 enum obj_operation_type op_type;
7da22d29 2518 u64 img_offset;
bf0d5f50 2519 u64 resid;
bf0d5f50 2520
f1a4739f
AE
2521 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2522 (int)type, data_desc);
37206ee5 2523
7da22d29 2524 img_offset = img_request->offset;
bf0d5f50 2525 resid = img_request->length;
4dda41d3 2526 rbd_assert(resid > 0);
3b434a2a 2527 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2528
2529 if (type == OBJ_REQUEST_BIO) {
2530 bio_list = data_desc;
4f024f37
KO
2531 rbd_assert(img_offset ==
2532 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2533 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2534 pages = data_desc;
2535 }
2536
bf0d5f50 2537 while (resid) {
2fa12320 2538 struct ceph_osd_request *osd_req;
bf0d5f50 2539 const char *object_name;
bf0d5f50
AE
2540 u64 offset;
2541 u64 length;
2542
7da22d29 2543 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2544 if (!object_name)
2545 goto out_unwind;
7da22d29
AE
2546 offset = rbd_segment_offset(rbd_dev, img_offset);
2547 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2548 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2549 offset, length, type);
78c2a44a
AE
2550 /* object request has its own copy of the object name */
2551 rbd_segment_name_free(object_name);
bf0d5f50
AE
2552 if (!obj_request)
2553 goto out_unwind;
62054da6 2554
03507db6
JD
2555 /*
2556 * set obj_request->img_request before creating the
2557 * osd_request so that it gets the right snapc
2558 */
2559 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2560
f1a4739f
AE
2561 if (type == OBJ_REQUEST_BIO) {
2562 unsigned int clone_size;
2563
2564 rbd_assert(length <= (u64)UINT_MAX);
2565 clone_size = (unsigned int)length;
2566 obj_request->bio_list =
2567 bio_chain_clone_range(&bio_list,
2568 &bio_offset,
2569 clone_size,
2224d879 2570 GFP_NOIO);
f1a4739f 2571 if (!obj_request->bio_list)
62054da6 2572 goto out_unwind;
90e98c52 2573 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2574 unsigned int page_count;
2575
2576 obj_request->pages = pages;
2577 page_count = (u32)calc_pages_for(offset, length);
2578 obj_request->page_count = page_count;
2579 if ((offset + length) & ~PAGE_MASK)
2580 page_count--; /* more on last page */
2581 pages += page_count;
2582 }
bf0d5f50 2583
6d2940c8
GZ
2584 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2585 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2586 obj_request);
2fa12320 2587 if (!osd_req)
62054da6 2588 goto out_unwind;
3b434a2a 2589
2fa12320 2590 obj_request->osd_req = osd_req;
2169238d 2591 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2592 obj_request->img_offset = img_offset;
9d4df01f 2593
3b434a2a 2594 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2595
7da22d29 2596 img_offset += length;
bf0d5f50
AE
2597 resid -= length;
2598 }
2599
2600 return 0;
2601
bf0d5f50
AE
2602out_unwind:
2603 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2604 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2605
2606 return -ENOMEM;
2607}
2608
0eefd470 2609static void
2761713d 2610rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2611{
2612 struct rbd_img_request *img_request;
2613 struct rbd_device *rbd_dev;
ebda6408 2614 struct page **pages;
0eefd470
AE
2615 u32 page_count;
2616
2761713d
ID
2617 dout("%s: obj %p\n", __func__, obj_request);
2618
d3246fb0
JD
2619 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2620 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2621 rbd_assert(obj_request_img_data_test(obj_request));
2622 img_request = obj_request->img_request;
2623 rbd_assert(img_request);
2624
2625 rbd_dev = img_request->rbd_dev;
2626 rbd_assert(rbd_dev);
0eefd470 2627
ebda6408
AE
2628 pages = obj_request->copyup_pages;
2629 rbd_assert(pages != NULL);
0eefd470 2630 obj_request->copyup_pages = NULL;
ebda6408
AE
2631 page_count = obj_request->copyup_page_count;
2632 rbd_assert(page_count);
2633 obj_request->copyup_page_count = 0;
2634 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2635
2636 /*
2637 * We want the transfer count to reflect the size of the
2638 * original write request. There is no such thing as a
2639 * successful short write, so if the request was successful
2640 * we can just set it to the originally-requested length.
2641 */
2642 if (!obj_request->result)
2643 obj_request->xferred = obj_request->length;
2644
2761713d 2645 obj_request_done_set(obj_request);
0eefd470
AE
2646}
2647
3d7efd18
AE
2648static void
2649rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2650{
2651 struct rbd_obj_request *orig_request;
0eefd470 2652 struct ceph_osd_request *osd_req;
0eefd470 2653 struct rbd_device *rbd_dev;
3d7efd18 2654 struct page **pages;
d3246fb0 2655 enum obj_operation_type op_type;
ebda6408 2656 u32 page_count;
bbea1c1a 2657 int img_result;
ebda6408 2658 u64 parent_length;
3d7efd18
AE
2659
2660 rbd_assert(img_request_child_test(img_request));
2661
2662 /* First get what we need from the image request */
2663
2664 pages = img_request->copyup_pages;
2665 rbd_assert(pages != NULL);
2666 img_request->copyup_pages = NULL;
ebda6408
AE
2667 page_count = img_request->copyup_page_count;
2668 rbd_assert(page_count);
2669 img_request->copyup_page_count = 0;
3d7efd18
AE
2670
2671 orig_request = img_request->obj_request;
2672 rbd_assert(orig_request != NULL);
b91f09f1 2673 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2674 img_result = img_request->result;
ebda6408 2675 parent_length = img_request->length;
fa355112 2676 rbd_assert(img_result || parent_length == img_request->xferred);
91c6febb 2677 rbd_img_request_put(img_request);
3d7efd18 2678
91c6febb
AE
2679 rbd_assert(orig_request->img_request);
2680 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2681 rbd_assert(rbd_dev);
0eefd470 2682
bbea1c1a
AE
2683 /*
2684 * If the overlap has become 0 (most likely because the
2685 * image has been flattened) we need to free the pages
2686 * and re-submit the original write request.
2687 */
2688 if (!rbd_dev->parent_overlap) {
bbea1c1a 2689 ceph_release_page_vector(pages, page_count);
980917fc
ID
2690 rbd_obj_request_submit(orig_request);
2691 return;
bbea1c1a 2692 }
0eefd470 2693
bbea1c1a 2694 if (img_result)
0eefd470 2695 goto out_err;
0eefd470 2696
8785b1d4
AE
2697 /*
2698 * The original osd request is of no use to use any more.
0ccd5926 2699 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2700 * request. Allocate the new copyup osd request for the
2701 * original request, and release the old one.
2702 */
bbea1c1a 2703 img_result = -ENOMEM;
0eefd470
AE
2704 osd_req = rbd_osd_req_create_copyup(orig_request);
2705 if (!osd_req)
2706 goto out_err;
8785b1d4 2707 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2708 orig_request->osd_req = osd_req;
2709 orig_request->copyup_pages = pages;
ebda6408 2710 orig_request->copyup_page_count = page_count;
3d7efd18 2711
0eefd470 2712 /* Initialize the copyup op */
3d7efd18 2713
0eefd470 2714 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2715 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2716 false, false);
3d7efd18 2717
d3246fb0 2718 /* Add the other op(s) */
0eefd470 2719
d3246fb0
JD
2720 op_type = rbd_img_request_op_type(orig_request->img_request);
2721 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2722
2723 /* All set, send it off. */
2724
980917fc
ID
2725 rbd_obj_request_submit(orig_request);
2726 return;
2727
0eefd470 2728out_err:
fa355112 2729 ceph_release_page_vector(pages, page_count);
bbea1c1a 2730 orig_request->result = img_result;
0eefd470 2731 orig_request->xferred = 0;
4a17dadc 2732 rbd_img_request_get(orig_request->img_request);
0eefd470
AE
2733 obj_request_done_set(orig_request);
2734 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2735}
2736
2737/*
2738 * Read from the parent image the range of data that covers the
2739 * entire target of the given object request. This is used for
2740 * satisfying a layered image write request when the target of an
2741 * object request from the image request does not exist.
2742 *
2743 * A page array big enough to hold the returned data is allocated
2744 * and supplied to rbd_img_request_fill() as the "data descriptor."
2745 * When the read completes, this page array will be transferred to
2746 * the original object request for the copyup operation.
2747 *
c2e82414
ID
2748 * If an error occurs, it is recorded as the result of the original
2749 * object request in rbd_img_obj_exists_callback().
3d7efd18
AE
2750 */
2751static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2752{
058aa991 2753 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2754 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2755 u64 img_offset;
2756 u64 length;
2757 struct page **pages = NULL;
2758 u32 page_count;
2759 int result;
2760
3d7efd18
AE
2761 rbd_assert(rbd_dev->parent != NULL);
2762
2763 /*
2764 * Determine the byte range covered by the object in the
2765 * child image to which the original request was to be sent.
2766 */
2767 img_offset = obj_request->img_offset - obj_request->offset;
2768 length = (u64)1 << rbd_dev->header.obj_order;
2769
a9e8ba2c
AE
2770 /*
2771 * There is no defined parent data beyond the parent
2772 * overlap, so limit what we read at that boundary if
2773 * necessary.
2774 */
2775 if (img_offset + length > rbd_dev->parent_overlap) {
2776 rbd_assert(img_offset < rbd_dev->parent_overlap);
2777 length = rbd_dev->parent_overlap - img_offset;
2778 }
2779
3d7efd18
AE
2780 /*
2781 * Allocate a page array big enough to receive the data read
2782 * from the parent.
2783 */
2784 page_count = (u32)calc_pages_for(0, length);
2785 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2786 if (IS_ERR(pages)) {
2787 result = PTR_ERR(pages);
2788 pages = NULL;
2789 goto out_err;
2790 }
2791
2792 result = -ENOMEM;
e93f3152
AE
2793 parent_request = rbd_parent_request_create(obj_request,
2794 img_offset, length);
3d7efd18
AE
2795 if (!parent_request)
2796 goto out_err;
3d7efd18
AE
2797
2798 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2799 if (result)
2800 goto out_err;
058aa991 2801
3d7efd18 2802 parent_request->copyup_pages = pages;
ebda6408 2803 parent_request->copyup_page_count = page_count;
3d7efd18 2804 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2805
3d7efd18
AE
2806 result = rbd_img_request_submit(parent_request);
2807 if (!result)
2808 return 0;
2809
2810 parent_request->copyup_pages = NULL;
ebda6408 2811 parent_request->copyup_page_count = 0;
3d7efd18
AE
2812 parent_request->obj_request = NULL;
2813 rbd_obj_request_put(obj_request);
2814out_err:
2815 if (pages)
2816 ceph_release_page_vector(pages, page_count);
2817 if (parent_request)
2818 rbd_img_request_put(parent_request);
3d7efd18
AE
2819 return result;
2820}
2821
c5b5ef6c
AE
2822static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2823{
c5b5ef6c 2824 struct rbd_obj_request *orig_request;
638f5abe 2825 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2826 int result;
2827
2828 rbd_assert(!obj_request_img_data_test(obj_request));
2829
2830 /*
2831 * All we need from the object request is the original
2832 * request and the result of the STAT op. Grab those, then
2833 * we're done with the request.
2834 */
2835 orig_request = obj_request->obj_request;
2836 obj_request->obj_request = NULL;
912c317d 2837 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2838 rbd_assert(orig_request);
2839 rbd_assert(orig_request->img_request);
2840
2841 result = obj_request->result;
2842 obj_request->result = 0;
2843
2844 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2845 obj_request, orig_request, result,
2846 obj_request->xferred, obj_request->length);
2847 rbd_obj_request_put(obj_request);
2848
638f5abe
AE
2849 /*
2850 * If the overlap has become 0 (most likely because the
980917fc
ID
2851 * image has been flattened) we need to re-submit the
2852 * original request.
638f5abe
AE
2853 */
2854 rbd_dev = orig_request->img_request->rbd_dev;
2855 if (!rbd_dev->parent_overlap) {
980917fc
ID
2856 rbd_obj_request_submit(orig_request);
2857 return;
638f5abe 2858 }
c5b5ef6c
AE
2859
2860 /*
2861 * Our only purpose here is to determine whether the object
2862 * exists, and we don't want to treat the non-existence as
2863 * an error. If something else comes back, transfer the
2864 * error to the original request and complete it now.
2865 */
2866 if (!result) {
2867 obj_request_existence_set(orig_request, true);
2868 } else if (result == -ENOENT) {
2869 obj_request_existence_set(orig_request, false);
c2e82414
ID
2870 } else {
2871 goto fail_orig_request;
c5b5ef6c
AE
2872 }
2873
2874 /*
2875 * Resubmit the original request now that we have recorded
2876 * whether the target object exists.
2877 */
c2e82414
ID
2878 result = rbd_img_obj_request_submit(orig_request);
2879 if (result)
2880 goto fail_orig_request;
2881
2882 return;
2883
2884fail_orig_request:
2885 orig_request->result = result;
2886 orig_request->xferred = 0;
4a17dadc 2887 rbd_img_request_get(orig_request->img_request);
c2e82414
ID
2888 obj_request_done_set(orig_request);
2889 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2890}
2891
2892static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2893{
058aa991 2894 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2895 struct rbd_obj_request *stat_request;
710214e3 2896 struct page **pages;
c5b5ef6c
AE
2897 u32 page_count;
2898 size_t size;
2899 int ret;
2900
710214e3
ID
2901 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2902 OBJ_REQUEST_PAGES);
2903 if (!stat_request)
2904 return -ENOMEM;
2905
2906 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2907 stat_request);
2908 if (!stat_request->osd_req) {
2909 ret = -ENOMEM;
2910 goto fail_stat_request;
2911 }
2912
c5b5ef6c
AE
2913 /*
2914 * The response data for a STAT call consists of:
2915 * le64 length;
2916 * struct {
2917 * le32 tv_sec;
2918 * le32 tv_nsec;
2919 * } mtime;
2920 */
2921 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2922 page_count = (u32)calc_pages_for(0, size);
2923 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
710214e3
ID
2924 if (IS_ERR(pages)) {
2925 ret = PTR_ERR(pages);
2926 goto fail_stat_request;
2927 }
c5b5ef6c 2928
710214e3
ID
2929 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2930 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2931 false, false);
c5b5ef6c
AE
2932
2933 rbd_obj_request_get(obj_request);
2934 stat_request->obj_request = obj_request;
2935 stat_request->pages = pages;
2936 stat_request->page_count = page_count;
c5b5ef6c
AE
2937 stat_request->callback = rbd_img_obj_exists_callback;
2938
980917fc
ID
2939 rbd_obj_request_submit(stat_request);
2940 return 0;
2941
710214e3
ID
2942fail_stat_request:
2943 rbd_obj_request_put(stat_request);
c5b5ef6c
AE
2944 return ret;
2945}
2946
70d045f6 2947static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2948{
058aa991
ID
2949 struct rbd_img_request *img_request = obj_request->img_request;
2950 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2951
70d045f6 2952 /* Reads */
1c220881
JD
2953 if (!img_request_write_test(img_request) &&
2954 !img_request_discard_test(img_request))
70d045f6
ID
2955 return true;
2956
2957 /* Non-layered writes */
2958 if (!img_request_layered_test(img_request))
2959 return true;
2960
b454e36d 2961 /*
70d045f6
ID
2962 * Layered writes outside of the parent overlap range don't
2963 * share any data with the parent.
b454e36d 2964 */
70d045f6
ID
2965 if (!obj_request_overlaps_parent(obj_request))
2966 return true;
b454e36d 2967
c622d226
GZ
2968 /*
2969 * Entire-object layered writes - we will overwrite whatever
2970 * parent data there is anyway.
2971 */
2972 if (!obj_request->offset &&
2973 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2974 return true;
2975
70d045f6
ID
2976 /*
2977 * If the object is known to already exist, its parent data has
2978 * already been copied.
2979 */
2980 if (obj_request_known_test(obj_request) &&
2981 obj_request_exists_test(obj_request))
2982 return true;
2983
2984 return false;
2985}
2986
2987static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2988{
058aa991
ID
2989 rbd_assert(obj_request_img_data_test(obj_request));
2990 rbd_assert(obj_request_type_valid(obj_request->type));
2991 rbd_assert(obj_request->img_request);
2992
70d045f6 2993 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2994 rbd_obj_request_submit(obj_request);
2995 return 0;
b454e36d
AE
2996 }
2997
2998 /*
3d7efd18
AE
2999 * It's a layered write. The target object might exist but
3000 * we may not know that yet. If we know it doesn't exist,
3001 * start by reading the data for the full target object from
3002 * the parent so we can use it for a copyup to the target.
b454e36d 3003 */
70d045f6 3004 if (obj_request_known_test(obj_request))
3d7efd18
AE
3005 return rbd_img_obj_parent_read_full(obj_request);
3006
3007 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
3008
3009 return rbd_img_obj_exists_submit(obj_request);
3010}
3011
bf0d5f50
AE
3012static int rbd_img_request_submit(struct rbd_img_request *img_request)
3013{
bf0d5f50 3014 struct rbd_obj_request *obj_request;
46faeed4 3015 struct rbd_obj_request *next_obj_request;
663ae2cc 3016 int ret = 0;
bf0d5f50 3017
37206ee5 3018 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 3019
663ae2cc
ID
3020 rbd_img_request_get(img_request);
3021 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 3022 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 3023 if (ret)
663ae2cc 3024 goto out_put_ireq;
bf0d5f50
AE
3025 }
3026
663ae2cc
ID
3027out_put_ireq:
3028 rbd_img_request_put(img_request);
3029 return ret;
bf0d5f50 3030}
8b3e1a56
AE
3031
3032static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3033{
3034 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
3035 struct rbd_device *rbd_dev;
3036 u64 obj_end;
02c74fba
AE
3037 u64 img_xferred;
3038 int img_result;
8b3e1a56
AE
3039
3040 rbd_assert(img_request_child_test(img_request));
3041
02c74fba
AE
3042 /* First get what we need from the image request and release it */
3043
8b3e1a56 3044 obj_request = img_request->obj_request;
02c74fba
AE
3045 img_xferred = img_request->xferred;
3046 img_result = img_request->result;
3047 rbd_img_request_put(img_request);
3048
3049 /*
3050 * If the overlap has become 0 (most likely because the
3051 * image has been flattened) we need to re-submit the
3052 * original request.
3053 */
a9e8ba2c
AE
3054 rbd_assert(obj_request);
3055 rbd_assert(obj_request->img_request);
02c74fba
AE
3056 rbd_dev = obj_request->img_request->rbd_dev;
3057 if (!rbd_dev->parent_overlap) {
980917fc
ID
3058 rbd_obj_request_submit(obj_request);
3059 return;
02c74fba 3060 }
a9e8ba2c 3061
02c74fba 3062 obj_request->result = img_result;
a9e8ba2c
AE
3063 if (obj_request->result)
3064 goto out;
3065
3066 /*
3067 * We need to zero anything beyond the parent overlap
3068 * boundary. Since rbd_img_obj_request_read_callback()
3069 * will zero anything beyond the end of a short read, an
3070 * easy way to do this is to pretend the data from the
3071 * parent came up short--ending at the overlap boundary.
3072 */
3073 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3074 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
3075 if (obj_end > rbd_dev->parent_overlap) {
3076 u64 xferred = 0;
3077
3078 if (obj_request->img_offset < rbd_dev->parent_overlap)
3079 xferred = rbd_dev->parent_overlap -
3080 obj_request->img_offset;
8b3e1a56 3081
02c74fba 3082 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 3083 } else {
02c74fba 3084 obj_request->xferred = img_xferred;
a9e8ba2c
AE
3085 }
3086out:
8b3e1a56
AE
3087 rbd_img_obj_request_read_callback(obj_request);
3088 rbd_obj_request_complete(obj_request);
3089}
3090
3091static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3092{
8b3e1a56
AE
3093 struct rbd_img_request *img_request;
3094 int result;
3095
3096 rbd_assert(obj_request_img_data_test(obj_request));
3097 rbd_assert(obj_request->img_request != NULL);
3098 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3099 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3100
8b3e1a56 3101 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3102 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3103 obj_request->img_offset,
e93f3152 3104 obj_request->length);
8b3e1a56
AE
3105 result = -ENOMEM;
3106 if (!img_request)
3107 goto out_err;
3108
5b2ab72d
AE
3109 if (obj_request->type == OBJ_REQUEST_BIO)
3110 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3111 obj_request->bio_list);
3112 else
3113 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3114 obj_request->pages);
8b3e1a56
AE
3115 if (result)
3116 goto out_err;
3117
3118 img_request->callback = rbd_img_parent_read_callback;
3119 result = rbd_img_request_submit(img_request);
3120 if (result)
3121 goto out_err;
3122
3123 return;
3124out_err:
3125 if (img_request)
3126 rbd_img_request_put(img_request);
3127 obj_request->result = result;
3128 obj_request->xferred = 0;
3129 obj_request_done_set(obj_request);
3130}
bf0d5f50 3131
ed95b21a
ID
3132static const struct rbd_client_id rbd_empty_cid;
3133
3134static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3135 const struct rbd_client_id *rhs)
3136{
3137 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3138}
3139
3140static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3141{
3142 struct rbd_client_id cid;
3143
3144 mutex_lock(&rbd_dev->watch_mutex);
3145 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3146 cid.handle = rbd_dev->watch_cookie;
3147 mutex_unlock(&rbd_dev->watch_mutex);
3148 return cid;
3149}
3150
3151/*
3152 * lock_rwsem must be held for write
3153 */
3154static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3155 const struct rbd_client_id *cid)
3156{
3157 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3158 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3159 cid->gid, cid->handle);
3160 rbd_dev->owner_cid = *cid; /* struct */
3161}
3162
3163static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3164{
3165 mutex_lock(&rbd_dev->watch_mutex);
3166 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3167 mutex_unlock(&rbd_dev->watch_mutex);
3168}
3169
3170/*
3171 * lock_rwsem must be held for write
3172 */
3173static int rbd_lock(struct rbd_device *rbd_dev)
3174{
3175 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3176 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3177 char cookie[32];
3178 int ret;
3179
3180 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3181
3182 format_lock_cookie(rbd_dev, cookie);
3183 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3184 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3185 RBD_LOCK_TAG, "", 0);
3186 if (ret)
3187 return ret;
3188
3189 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3190 rbd_set_owner_cid(rbd_dev, &cid);
3191 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3192 return 0;
3193}
3194
3195/*
3196 * lock_rwsem must be held for write
3197 */
3198static int rbd_unlock(struct rbd_device *rbd_dev)
b8d70035 3199{
922dab61 3200 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 3201 char cookie[32];
e627db08 3202 int ret;
b8d70035 3203
ed95b21a
ID
3204 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3205
3206 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3207
3208 format_lock_cookie(rbd_dev, cookie);
3209 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3210 RBD_LOCK_NAME, cookie);
3211 if (ret && ret != -ENOENT) {
3212 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3213 return ret;
3214 }
3215
3216 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3217 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3218 return 0;
3219}
3220
3221static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3222 enum rbd_notify_op notify_op,
3223 struct page ***preply_pages,
3224 size_t *preply_len)
3225{
3226 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3227 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3228 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3229 char buf[buf_size];
3230 void *p = buf;
3231
3232 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3233
3234 /* encode *LockPayload NotifyMessage (op + ClientId) */
3235 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3236 ceph_encode_32(&p, notify_op);
3237 ceph_encode_64(&p, cid.gid);
3238 ceph_encode_64(&p, cid.handle);
3239
3240 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3241 &rbd_dev->header_oloc, buf, buf_size,
3242 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3243}
3244
3245static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3246 enum rbd_notify_op notify_op)
3247{
3248 struct page **reply_pages;
3249 size_t reply_len;
3250
3251 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3252 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3253}
3254
3255static void rbd_notify_acquired_lock(struct work_struct *work)
3256{
3257 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3258 acquired_lock_work);
3259
3260 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3261}
3262
3263static void rbd_notify_released_lock(struct work_struct *work)
3264{
3265 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3266 released_lock_work);
3267
3268 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3269}
3270
3271static int rbd_request_lock(struct rbd_device *rbd_dev)
3272{
3273 struct page **reply_pages;
3274 size_t reply_len;
3275 bool lock_owner_responded = false;
3276 int ret;
52bb1f9b 3277
ed95b21a
ID
3278 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3279
3280 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3281 &reply_pages, &reply_len);
3282 if (ret && ret != -ETIMEDOUT) {
3283 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3284 goto out;
3285 }
3286
3287 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3288 void *p = page_address(reply_pages[0]);
3289 void *const end = p + reply_len;
3290 u32 n;
3291
3292 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3293 while (n--) {
3294 u8 struct_v;
3295 u32 len;
3296
3297 ceph_decode_need(&p, end, 8 + 8, e_inval);
3298 p += 8 + 8; /* skip gid and cookie */
3299
3300 ceph_decode_32_safe(&p, end, len, e_inval);
3301 if (!len)
3302 continue;
3303
3304 if (lock_owner_responded) {
3305 rbd_warn(rbd_dev,
3306 "duplicate lock owners detected");
3307 ret = -EIO;
3308 goto out;
3309 }
3310
3311 lock_owner_responded = true;
3312 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3313 &struct_v, &len);
3314 if (ret) {
3315 rbd_warn(rbd_dev,
3316 "failed to decode ResponseMessage: %d",
3317 ret);
3318 goto e_inval;
3319 }
3320
3321 ret = ceph_decode_32(&p);
3322 }
3323 }
3324
3325 if (!lock_owner_responded) {
3326 rbd_warn(rbd_dev, "no lock owners detected");
3327 ret = -ETIMEDOUT;
3328 }
3329
3330out:
3331 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3332 return ret;
3333
3334e_inval:
3335 ret = -EINVAL;
3336 goto out;
3337}
3338
3339static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3340{
3341 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3342
3343 cancel_delayed_work(&rbd_dev->lock_dwork);
3344 if (wake_all)
3345 wake_up_all(&rbd_dev->lock_waitq);
3346 else
3347 wake_up(&rbd_dev->lock_waitq);
3348}
3349
3350static int get_lock_owner_info(struct rbd_device *rbd_dev,
3351 struct ceph_locker **lockers, u32 *num_lockers)
3352{
3353 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3354 u8 lock_type;
3355 char *lock_tag;
3356 int ret;
3357
3358 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3359
3360 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3361 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3362 &lock_type, &lock_tag, lockers, num_lockers);
3363 if (ret)
3364 return ret;
3365
3366 if (*num_lockers == 0) {
3367 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3368 goto out;
3369 }
3370
3371 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3372 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3373 lock_tag);
3374 ret = -EBUSY;
3375 goto out;
3376 }
3377
3378 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3379 rbd_warn(rbd_dev, "shared lock type detected");
3380 ret = -EBUSY;
3381 goto out;
3382 }
3383
3384 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3385 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3386 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3387 (*lockers)[0].id.cookie);
3388 ret = -EBUSY;
3389 goto out;
3390 }
3391
3392out:
3393 kfree(lock_tag);
3394 return ret;
3395}
3396
3397static int find_watcher(struct rbd_device *rbd_dev,
3398 const struct ceph_locker *locker)
3399{
3400 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3401 struct ceph_watch_item *watchers;
3402 u32 num_watchers;
3403 u64 cookie;
3404 int i;
3405 int ret;
3406
3407 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3408 &rbd_dev->header_oloc, &watchers,
3409 &num_watchers);
3410 if (ret)
3411 return ret;
3412
3413 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3414 for (i = 0; i < num_watchers; i++) {
3415 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3416 sizeof(locker->info.addr)) &&
3417 watchers[i].cookie == cookie) {
3418 struct rbd_client_id cid = {
3419 .gid = le64_to_cpu(watchers[i].name.num),
3420 .handle = cookie,
3421 };
3422
3423 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3424 rbd_dev, cid.gid, cid.handle);
3425 rbd_set_owner_cid(rbd_dev, &cid);
3426 ret = 1;
3427 goto out;
3428 }
3429 }
3430
3431 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3432 ret = 0;
3433out:
3434 kfree(watchers);
3435 return ret;
3436}
3437
3438/*
3439 * lock_rwsem must be held for write
3440 */
3441static int rbd_try_lock(struct rbd_device *rbd_dev)
3442{
3443 struct ceph_client *client = rbd_dev->rbd_client->client;
3444 struct ceph_locker *lockers;
3445 u32 num_lockers;
3446 int ret;
3447
3448 for (;;) {
3449 ret = rbd_lock(rbd_dev);
3450 if (ret != -EBUSY)
3451 return ret;
3452
3453 /* determine if the current lock holder is still alive */
3454 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3455 if (ret)
3456 return ret;
3457
3458 if (num_lockers == 0)
3459 goto again;
3460
3461 ret = find_watcher(rbd_dev, lockers);
3462 if (ret) {
3463 if (ret > 0)
3464 ret = 0; /* have to request lock */
3465 goto out;
3466 }
3467
3468 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3469 ENTITY_NAME(lockers[0].id.name));
3470
3471 ret = ceph_monc_blacklist_add(&client->monc,
3472 &lockers[0].info.addr);
3473 if (ret) {
3474 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3475 ENTITY_NAME(lockers[0].id.name), ret);
3476 goto out;
3477 }
3478
3479 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3480 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3481 lockers[0].id.cookie,
3482 &lockers[0].id.name);
3483 if (ret && ret != -ENOENT)
3484 goto out;
3485
3486again:
3487 ceph_free_lockers(lockers, num_lockers);
3488 }
3489
3490out:
3491 ceph_free_lockers(lockers, num_lockers);
3492 return ret;
3493}
3494
3495/*
3496 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3497 */
3498static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3499 int *pret)
3500{
3501 enum rbd_lock_state lock_state;
3502
3503 down_read(&rbd_dev->lock_rwsem);
3504 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3505 rbd_dev->lock_state);
3506 if (__rbd_is_lock_owner(rbd_dev)) {
3507 lock_state = rbd_dev->lock_state;
3508 up_read(&rbd_dev->lock_rwsem);
3509 return lock_state;
3510 }
3511
3512 up_read(&rbd_dev->lock_rwsem);
3513 down_write(&rbd_dev->lock_rwsem);
3514 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3515 rbd_dev->lock_state);
3516 if (!__rbd_is_lock_owner(rbd_dev)) {
3517 *pret = rbd_try_lock(rbd_dev);
3518 if (*pret)
3519 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3520 }
3521
3522 lock_state = rbd_dev->lock_state;
3523 up_write(&rbd_dev->lock_rwsem);
3524 return lock_state;
3525}
3526
3527static void rbd_acquire_lock(struct work_struct *work)
3528{
3529 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3530 struct rbd_device, lock_dwork);
3531 enum rbd_lock_state lock_state;
3532 int ret;
3533
3534 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3535again:
3536 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3537 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3538 if (lock_state == RBD_LOCK_STATE_LOCKED)
3539 wake_requests(rbd_dev, true);
3540 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3541 rbd_dev, lock_state, ret);
3542 return;
3543 }
3544
3545 ret = rbd_request_lock(rbd_dev);
3546 if (ret == -ETIMEDOUT) {
3547 goto again; /* treat this as a dead client */
3548 } else if (ret < 0) {
3549 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3550 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3551 RBD_RETRY_DELAY);
3552 } else {
3553 /*
3554 * lock owner acked, but resend if we don't see them
3555 * release the lock
3556 */
3557 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3558 rbd_dev);
3559 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3560 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3561 }
3562}
3563
3564/*
3565 * lock_rwsem must be held for write
3566 */
3567static bool rbd_release_lock(struct rbd_device *rbd_dev)
3568{
3569 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3570 rbd_dev->lock_state);
3571 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3572 return false;
3573
3574 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3575 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3576 /*
ed95b21a 3577 * Ensure that all in-flight IO is flushed.
52bb1f9b 3578 *
ed95b21a
ID
3579 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3580 * may be shared with other devices.
52bb1f9b 3581 */
ed95b21a
ID
3582 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3583 up_read(&rbd_dev->lock_rwsem);
3584
3585 down_write(&rbd_dev->lock_rwsem);
3586 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3587 rbd_dev->lock_state);
3588 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3589 return false;
3590
3591 if (!rbd_unlock(rbd_dev))
3592 /*
3593 * Give others a chance to grab the lock - we would re-acquire
3594 * almost immediately if we got new IO during ceph_osdc_sync()
3595 * otherwise. We need to ack our own notifications, so this
3596 * lock_dwork will be requeued from rbd_wait_state_locked()
3597 * after wake_requests() in rbd_handle_released_lock().
3598 */
3599 cancel_delayed_work(&rbd_dev->lock_dwork);
3600
3601 return true;
3602}
3603
3604static void rbd_release_lock_work(struct work_struct *work)
3605{
3606 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3607 unlock_work);
3608
3609 down_write(&rbd_dev->lock_rwsem);
3610 rbd_release_lock(rbd_dev);
3611 up_write(&rbd_dev->lock_rwsem);
3612}
3613
3614static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3615 void **p)
3616{
3617 struct rbd_client_id cid = { 0 };
3618
3619 if (struct_v >= 2) {
3620 cid.gid = ceph_decode_64(p);
3621 cid.handle = ceph_decode_64(p);
3622 }
3623
3624 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3625 cid.handle);
3626 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3627 down_write(&rbd_dev->lock_rwsem);
3628 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3629 /*
3630 * we already know that the remote client is
3631 * the owner
3632 */
3633 up_write(&rbd_dev->lock_rwsem);
3634 return;
3635 }
3636
3637 rbd_set_owner_cid(rbd_dev, &cid);
3638 downgrade_write(&rbd_dev->lock_rwsem);
3639 } else {
3640 down_read(&rbd_dev->lock_rwsem);
3641 }
3642
3643 if (!__rbd_is_lock_owner(rbd_dev))
3644 wake_requests(rbd_dev, false);
3645 up_read(&rbd_dev->lock_rwsem);
3646}
3647
3648static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3649 void **p)
3650{
3651 struct rbd_client_id cid = { 0 };
3652
3653 if (struct_v >= 2) {
3654 cid.gid = ceph_decode_64(p);
3655 cid.handle = ceph_decode_64(p);
3656 }
3657
3658 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3659 cid.handle);
3660 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3661 down_write(&rbd_dev->lock_rwsem);
3662 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3663 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3664 __func__, rbd_dev, cid.gid, cid.handle,
3665 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3666 up_write(&rbd_dev->lock_rwsem);
3667 return;
3668 }
3669
3670 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3671 downgrade_write(&rbd_dev->lock_rwsem);
3672 } else {
3673 down_read(&rbd_dev->lock_rwsem);
3674 }
3675
3676 if (!__rbd_is_lock_owner(rbd_dev))
3677 wake_requests(rbd_dev, false);
3678 up_read(&rbd_dev->lock_rwsem);
3679}
3680
3681static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3682 void **p)
3683{
3684 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3685 struct rbd_client_id cid = { 0 };
3686 bool need_to_send;
3687
3688 if (struct_v >= 2) {
3689 cid.gid = ceph_decode_64(p);
3690 cid.handle = ceph_decode_64(p);
3691 }
3692
3693 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3694 cid.handle);
3695 if (rbd_cid_equal(&cid, &my_cid))
3696 return false;
3697
3698 down_read(&rbd_dev->lock_rwsem);
3699 need_to_send = __rbd_is_lock_owner(rbd_dev);
3700 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3701 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3702 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3703 rbd_dev);
3704 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3705 }
3706 }
3707 up_read(&rbd_dev->lock_rwsem);
3708 return need_to_send;
3709}
3710
3711static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3712 u64 notify_id, u64 cookie, s32 *result)
3713{
3714 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3715 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3716 char buf[buf_size];
3717 int ret;
3718
3719 if (result) {
3720 void *p = buf;
3721
3722 /* encode ResponseMessage */
3723 ceph_start_encoding(&p, 1, 1,
3724 buf_size - CEPH_ENCODING_START_BLK_LEN);
3725 ceph_encode_32(&p, *result);
3726 } else {
3727 buf_size = 0;
3728 }
b8d70035 3729
922dab61
ID
3730 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3731 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3732 buf, buf_size);
52bb1f9b 3733 if (ret)
ed95b21a
ID
3734 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3735}
3736
3737static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3738 u64 cookie)
3739{
3740 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3741 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3742}
3743
3744static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3745 u64 notify_id, u64 cookie, s32 result)
3746{
3747 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3748 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3749}
3750
3751static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3752 u64 notifier_id, void *data, size_t data_len)
3753{
3754 struct rbd_device *rbd_dev = arg;
3755 void *p = data;
3756 void *const end = p + data_len;
3757 u8 struct_v;
3758 u32 len;
3759 u32 notify_op;
3760 int ret;
3761
3762 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3763 __func__, rbd_dev, cookie, notify_id, data_len);
3764 if (data_len) {
3765 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3766 &struct_v, &len);
3767 if (ret) {
3768 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3769 ret);
3770 return;
3771 }
3772
3773 notify_op = ceph_decode_32(&p);
3774 } else {
3775 /* legacy notification for header updates */
3776 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3777 len = 0;
3778 }
3779
3780 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3781 switch (notify_op) {
3782 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3783 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3784 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3785 break;
3786 case RBD_NOTIFY_OP_RELEASED_LOCK:
3787 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3788 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3789 break;
3790 case RBD_NOTIFY_OP_REQUEST_LOCK:
3791 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3792 /*
3793 * send ResponseMessage(0) back so the client
3794 * can detect a missing owner
3795 */
3796 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3797 cookie, 0);
3798 else
3799 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3800 break;
3801 case RBD_NOTIFY_OP_HEADER_UPDATE:
3802 ret = rbd_dev_refresh(rbd_dev);
3803 if (ret)
3804 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3805
3806 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3807 break;
3808 default:
3809 if (rbd_is_lock_owner(rbd_dev))
3810 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3811 cookie, -EOPNOTSUPP);
3812 else
3813 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3814 break;
3815 }
b8d70035
AE
3816}
3817
99d16943
ID
3818static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3819
922dab61 3820static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3821{
922dab61 3822 struct rbd_device *rbd_dev = arg;
bb040aa0 3823
922dab61 3824 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3825
ed95b21a
ID
3826 down_write(&rbd_dev->lock_rwsem);
3827 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3828 up_write(&rbd_dev->lock_rwsem);
3829
99d16943
ID
3830 mutex_lock(&rbd_dev->watch_mutex);
3831 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3832 __rbd_unregister_watch(rbd_dev);
3833 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3834
99d16943 3835 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3836 }
99d16943 3837 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3838}
3839
9969ebc5 3840/*
99d16943 3841 * watch_mutex must be locked
9969ebc5 3842 */
99d16943 3843static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3844{
3845 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3846 struct ceph_osd_linger_request *handle;
9969ebc5 3847
922dab61 3848 rbd_assert(!rbd_dev->watch_handle);
99d16943 3849 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3850
922dab61
ID
3851 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3852 &rbd_dev->header_oloc, rbd_watch_cb,
3853 rbd_watch_errcb, rbd_dev);
3854 if (IS_ERR(handle))
3855 return PTR_ERR(handle);
8eb87565 3856
922dab61 3857 rbd_dev->watch_handle = handle;
b30a01f2 3858 return 0;
b30a01f2
ID
3859}
3860
99d16943
ID
3861/*
3862 * watch_mutex must be locked
3863 */
3864static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3865{
922dab61
ID
3866 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3867 int ret;
b30a01f2 3868
99d16943
ID
3869 rbd_assert(rbd_dev->watch_handle);
3870 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3871
922dab61
ID
3872 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3873 if (ret)
3874 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3875
922dab61 3876 rbd_dev->watch_handle = NULL;
c525f036
ID
3877}
3878
99d16943
ID
3879static int rbd_register_watch(struct rbd_device *rbd_dev)
3880{
3881 int ret;
3882
3883 mutex_lock(&rbd_dev->watch_mutex);
3884 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3885 ret = __rbd_register_watch(rbd_dev);
3886 if (ret)
3887 goto out;
3888
3889 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3890 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3891
3892out:
3893 mutex_unlock(&rbd_dev->watch_mutex);
3894 return ret;
3895}
3896
3897static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3898{
99d16943
ID
3899 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3900
3901 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3902 cancel_work_sync(&rbd_dev->acquired_lock_work);
3903 cancel_work_sync(&rbd_dev->released_lock_work);
3904 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3905 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3906}
3907
3908static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3909{
ed95b21a 3910 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3911 cancel_tasks_sync(rbd_dev);
3912
3913 mutex_lock(&rbd_dev->watch_mutex);
3914 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3915 __rbd_unregister_watch(rbd_dev);
3916 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3917 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3918
811c6688 3919 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3920}
3921
99d16943
ID
3922static void rbd_reregister_watch(struct work_struct *work)
3923{
3924 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3925 struct rbd_device, watch_dwork);
ed95b21a 3926 bool was_lock_owner = false;
99d16943
ID
3927 int ret;
3928
3929 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3930
ed95b21a
ID
3931 down_write(&rbd_dev->lock_rwsem);
3932 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3933 was_lock_owner = rbd_release_lock(rbd_dev);
3934
99d16943
ID
3935 mutex_lock(&rbd_dev->watch_mutex);
3936 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
3937 goto fail_unlock;
3938
3939 ret = __rbd_register_watch(rbd_dev);
3940 if (ret) {
3941 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3942 if (ret != -EBLACKLISTED)
3943 queue_delayed_work(rbd_dev->task_wq,
3944 &rbd_dev->watch_dwork,
3945 RBD_RETRY_DELAY);
3946 goto fail_unlock;
3947 }
3948
3949 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3950 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3951 mutex_unlock(&rbd_dev->watch_mutex);
3952
3953 ret = rbd_dev_refresh(rbd_dev);
3954 if (ret)
3955 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3956
ed95b21a
ID
3957 if (was_lock_owner) {
3958 ret = rbd_try_lock(rbd_dev);
3959 if (ret)
3960 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3961 ret);
3962 }
3963
3964 up_write(&rbd_dev->lock_rwsem);
3965 wake_requests(rbd_dev, true);
99d16943
ID
3966 return;
3967
3968fail_unlock:
3969 mutex_unlock(&rbd_dev->watch_mutex);
ed95b21a 3970 up_write(&rbd_dev->lock_rwsem);
99d16943
ID
3971}
3972
36be9a76 3973/*
f40eb349
AE
3974 * Synchronous osd object method call. Returns the number of bytes
3975 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3976 */
3977static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3978 const char *object_name,
3979 const char *class_name,
3980 const char *method_name,
4157976b 3981 const void *outbound,
36be9a76 3982 size_t outbound_size,
4157976b 3983 void *inbound,
e2a58ee5 3984 size_t inbound_size)
36be9a76
AE
3985{
3986 struct rbd_obj_request *obj_request;
36be9a76
AE
3987 struct page **pages;
3988 u32 page_count;
3989 int ret;
3990
3991 /*
6010a451
AE
3992 * Method calls are ultimately read operations. The result
3993 * should placed into the inbound buffer provided. They
3994 * also supply outbound data--parameters for the object
3995 * method. Currently if this is present it will be a
3996 * snapshot id.
36be9a76 3997 */
57385b51 3998 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
3999 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4000 if (IS_ERR(pages))
4001 return PTR_ERR(pages);
4002
4003 ret = -ENOMEM;
6010a451 4004 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
4005 OBJ_REQUEST_PAGES);
4006 if (!obj_request)
4007 goto out;
4008
4009 obj_request->pages = pages;
4010 obj_request->page_count = page_count;
4011
6d2940c8 4012 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4013 obj_request);
36be9a76
AE
4014 if (!obj_request->osd_req)
4015 goto out;
4016
c99d2d4a 4017 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
4018 class_name, method_name);
4019 if (outbound_size) {
4020 struct ceph_pagelist *pagelist;
4021
4022 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4023 if (!pagelist)
4024 goto out;
4025
4026 ceph_pagelist_init(pagelist);
4027 ceph_pagelist_append(pagelist, outbound, outbound_size);
4028 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4029 pagelist);
4030 }
a4ce40a9
AE
4031 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4032 obj_request->pages, inbound_size,
44cd188d 4033 0, false, false);
430c28c3 4034
980917fc 4035 rbd_obj_request_submit(obj_request);
36be9a76
AE
4036 ret = rbd_obj_request_wait(obj_request);
4037 if (ret)
4038 goto out;
4039
4040 ret = obj_request->result;
4041 if (ret < 0)
4042 goto out;
57385b51
AE
4043
4044 rbd_assert(obj_request->xferred < (u64)INT_MAX);
4045 ret = (int)obj_request->xferred;
903bb32e 4046 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
4047out:
4048 if (obj_request)
4049 rbd_obj_request_put(obj_request);
4050 else
4051 ceph_release_page_vector(pages, page_count);
4052
4053 return ret;
4054}
4055
ed95b21a
ID
4056/*
4057 * lock_rwsem must be held for read
4058 */
4059static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4060{
4061 DEFINE_WAIT(wait);
4062
4063 do {
4064 /*
4065 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4066 * and cancel_delayed_work() in wake_requests().
4067 */
4068 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4069 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4070 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4071 TASK_UNINTERRUPTIBLE);
4072 up_read(&rbd_dev->lock_rwsem);
4073 schedule();
4074 down_read(&rbd_dev->lock_rwsem);
4075 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4076 finish_wait(&rbd_dev->lock_waitq, &wait);
4077}
4078
7ad18afa 4079static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4080{
7ad18afa
CH
4081 struct request *rq = blk_mq_rq_from_pdu(work);
4082 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4083 struct rbd_img_request *img_request;
4e752f0a 4084 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4085 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4086 u64 length = blk_rq_bytes(rq);
6d2940c8 4087 enum obj_operation_type op_type;
4e752f0a 4088 u64 mapping_size;
80de1912 4089 bool must_be_locked;
bf0d5f50
AE
4090 int result;
4091
7ad18afa
CH
4092 if (rq->cmd_type != REQ_TYPE_FS) {
4093 dout("%s: non-fs request type %d\n", __func__,
4094 (int) rq->cmd_type);
4095 result = -EIO;
4096 goto err;
4097 }
4098
c2df40df 4099 if (req_op(rq) == REQ_OP_DISCARD)
90e98c52 4100 op_type = OBJ_OP_DISCARD;
c2df40df 4101 else if (req_op(rq) == REQ_OP_WRITE)
6d2940c8
GZ
4102 op_type = OBJ_OP_WRITE;
4103 else
4104 op_type = OBJ_OP_READ;
4105
bc1ecc65 4106 /* Ignore/skip any zero-length requests */
bf0d5f50 4107
bc1ecc65
ID
4108 if (!length) {
4109 dout("%s: zero-length request\n", __func__);
4110 result = 0;
4111 goto err_rq;
4112 }
bf0d5f50 4113
6d2940c8 4114 /* Only reads are allowed to a read-only device */
bc1ecc65 4115
6d2940c8 4116 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4117 if (rbd_dev->mapping.read_only) {
4118 result = -EROFS;
4119 goto err_rq;
4dda41d3 4120 }
bc1ecc65
ID
4121 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4122 }
4dda41d3 4123
bc1ecc65
ID
4124 /*
4125 * Quit early if the mapped snapshot no longer exists. It's
4126 * still possible the snapshot will have disappeared by the
4127 * time our request arrives at the osd, but there's no sense in
4128 * sending it if we already know.
4129 */
4130 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4131 dout("request for non-existent snapshot");
4132 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4133 result = -ENXIO;
4134 goto err_rq;
4135 }
4dda41d3 4136
bc1ecc65
ID
4137 if (offset && length > U64_MAX - offset + 1) {
4138 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4139 length);
4140 result = -EINVAL;
4141 goto err_rq; /* Shouldn't happen */
4142 }
4dda41d3 4143
7ad18afa
CH
4144 blk_mq_start_request(rq);
4145
4e752f0a
JD
4146 down_read(&rbd_dev->header_rwsem);
4147 mapping_size = rbd_dev->mapping.size;
6d2940c8 4148 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4149 snapc = rbd_dev->header.snapc;
4150 ceph_get_snap_context(snapc);
ed95b21a 4151 must_be_locked = rbd_is_lock_supported(rbd_dev);
80de1912
ID
4152 } else {
4153 must_be_locked = rbd_dev->opts->lock_on_read &&
4154 rbd_is_lock_supported(rbd_dev);
4e752f0a
JD
4155 }
4156 up_read(&rbd_dev->header_rwsem);
4157
4158 if (offset + length > mapping_size) {
bc1ecc65 4159 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4160 length, mapping_size);
bc1ecc65
ID
4161 result = -EIO;
4162 goto err_rq;
4163 }
bf0d5f50 4164
ed95b21a
ID
4165 if (must_be_locked) {
4166 down_read(&rbd_dev->lock_rwsem);
4167 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4168 rbd_wait_state_locked(rbd_dev);
4169 }
4170
6d2940c8 4171 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4172 snapc);
bc1ecc65
ID
4173 if (!img_request) {
4174 result = -ENOMEM;
ed95b21a 4175 goto err_unlock;
bc1ecc65
ID
4176 }
4177 img_request->rq = rq;
70b16db8 4178 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4179
90e98c52
GZ
4180 if (op_type == OBJ_OP_DISCARD)
4181 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4182 NULL);
4183 else
4184 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4185 rq->bio);
bc1ecc65
ID
4186 if (result)
4187 goto err_img_request;
bf0d5f50 4188
bc1ecc65
ID
4189 result = rbd_img_request_submit(img_request);
4190 if (result)
4191 goto err_img_request;
bf0d5f50 4192
ed95b21a
ID
4193 if (must_be_locked)
4194 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4195 return;
bf0d5f50 4196
bc1ecc65
ID
4197err_img_request:
4198 rbd_img_request_put(img_request);
ed95b21a
ID
4199err_unlock:
4200 if (must_be_locked)
4201 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4202err_rq:
4203 if (result)
4204 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4205 obj_op_name(op_type), length, offset, result);
e96a650a 4206 ceph_put_snap_context(snapc);
7ad18afa
CH
4207err:
4208 blk_mq_end_request(rq, result);
bc1ecc65 4209}
bf0d5f50 4210
7ad18afa
CH
4211static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4212 const struct blk_mq_queue_data *bd)
bc1ecc65 4213{
7ad18afa
CH
4214 struct request *rq = bd->rq;
4215 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4216
7ad18afa
CH
4217 queue_work(rbd_wq, work);
4218 return BLK_MQ_RQ_QUEUE_OK;
bf0d5f50
AE
4219}
4220
602adf40
YS
4221static void rbd_free_disk(struct rbd_device *rbd_dev)
4222{
4223 struct gendisk *disk = rbd_dev->disk;
4224
4225 if (!disk)
4226 return;
4227
a0cab924
AE
4228 rbd_dev->disk = NULL;
4229 if (disk->flags & GENHD_FL_UP) {
602adf40 4230 del_gendisk(disk);
a0cab924
AE
4231 if (disk->queue)
4232 blk_cleanup_queue(disk->queue);
7ad18afa 4233 blk_mq_free_tag_set(&rbd_dev->tag_set);
a0cab924 4234 }
602adf40
YS
4235 put_disk(disk);
4236}
4237
788e2df3
AE
4238static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4239 const char *object_name,
7097f8df 4240 u64 offset, u64 length, void *buf)
788e2df3
AE
4241
4242{
788e2df3 4243 struct rbd_obj_request *obj_request;
788e2df3
AE
4244 struct page **pages = NULL;
4245 u32 page_count;
1ceae7ef 4246 size_t size;
788e2df3
AE
4247 int ret;
4248
4249 page_count = (u32) calc_pages_for(offset, length);
4250 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4251 if (IS_ERR(pages))
a8d42056 4252 return PTR_ERR(pages);
788e2df3
AE
4253
4254 ret = -ENOMEM;
4255 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 4256 OBJ_REQUEST_PAGES);
788e2df3
AE
4257 if (!obj_request)
4258 goto out;
4259
4260 obj_request->pages = pages;
4261 obj_request->page_count = page_count;
4262
6d2940c8 4263 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4264 obj_request);
788e2df3
AE
4265 if (!obj_request->osd_req)
4266 goto out;
4267
c99d2d4a
AE
4268 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4269 offset, length, 0, 0);
406e2c9f 4270 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 4271 obj_request->pages,
44cd188d
AE
4272 obj_request->length,
4273 obj_request->offset & ~PAGE_MASK,
4274 false, false);
430c28c3 4275
980917fc 4276 rbd_obj_request_submit(obj_request);
788e2df3
AE
4277 ret = rbd_obj_request_wait(obj_request);
4278 if (ret)
4279 goto out;
4280
4281 ret = obj_request->result;
4282 if (ret < 0)
4283 goto out;
1ceae7ef
AE
4284
4285 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4286 size = (size_t) obj_request->xferred;
903bb32e 4287 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
4288 rbd_assert(size <= (size_t)INT_MAX);
4289 ret = (int)size;
788e2df3
AE
4290out:
4291 if (obj_request)
4292 rbd_obj_request_put(obj_request);
4293 else
4294 ceph_release_page_vector(pages, page_count);
4295
4296 return ret;
4297}
4298
602adf40 4299/*
662518b1
AE
4300 * Read the complete header for the given rbd device. On successful
4301 * return, the rbd_dev->header field will contain up-to-date
4302 * information about the image.
602adf40 4303 */
99a41ebc 4304static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4305{
4156d998 4306 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4307 u32 snap_count = 0;
4156d998
AE
4308 u64 names_size = 0;
4309 u32 want_count;
4310 int ret;
602adf40 4311
00f1f36f 4312 /*
4156d998
AE
4313 * The complete header will include an array of its 64-bit
4314 * snapshot ids, followed by the names of those snapshots as
4315 * a contiguous block of NUL-terminated strings. Note that
4316 * the number of snapshots could change by the time we read
4317 * it in, in which case we re-read it.
00f1f36f 4318 */
4156d998
AE
4319 do {
4320 size_t size;
4321
4322 kfree(ondisk);
4323
4324 size = sizeof (*ondisk);
4325 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4326 size += names_size;
4327 ondisk = kmalloc(size, GFP_KERNEL);
4328 if (!ondisk)
662518b1 4329 return -ENOMEM;
4156d998 4330
c41d13a3 4331 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
7097f8df 4332 0, size, ondisk);
4156d998 4333 if (ret < 0)
662518b1 4334 goto out;
c0cd10db 4335 if ((size_t)ret < size) {
4156d998 4336 ret = -ENXIO;
06ecc6cb
AE
4337 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4338 size, ret);
662518b1 4339 goto out;
4156d998
AE
4340 }
4341 if (!rbd_dev_ondisk_valid(ondisk)) {
4342 ret = -ENXIO;
06ecc6cb 4343 rbd_warn(rbd_dev, "invalid header");
662518b1 4344 goto out;
81e759fb 4345 }
602adf40 4346
4156d998
AE
4347 names_size = le64_to_cpu(ondisk->snap_names_len);
4348 want_count = snap_count;
4349 snap_count = le32_to_cpu(ondisk->snap_count);
4350 } while (snap_count != want_count);
00f1f36f 4351
662518b1
AE
4352 ret = rbd_header_from_disk(rbd_dev, ondisk);
4353out:
4156d998
AE
4354 kfree(ondisk);
4355
4356 return ret;
602adf40
YS
4357}
4358
15228ede
AE
4359/*
4360 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4361 * has disappeared from the (just updated) snapshot context.
4362 */
4363static void rbd_exists_validate(struct rbd_device *rbd_dev)
4364{
4365 u64 snap_id;
4366
4367 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4368 return;
4369
4370 snap_id = rbd_dev->spec->snap_id;
4371 if (snap_id == CEPH_NOSNAP)
4372 return;
4373
4374 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4375 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4376}
4377
9875201e
JD
4378static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4379{
4380 sector_t size;
9875201e
JD
4381
4382 /*
811c6688
ID
4383 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4384 * try to update its size. If REMOVING is set, updating size
4385 * is just useless work since the device can't be opened.
9875201e 4386 */
811c6688
ID
4387 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4388 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4389 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4390 dout("setting size to %llu sectors", (unsigned long long)size);
4391 set_capacity(rbd_dev->disk, size);
4392 revalidate_disk(rbd_dev->disk);
4393 }
4394}
4395
cc4a38bd 4396static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4397{
e627db08 4398 u64 mapping_size;
1fe5e993
AE
4399 int ret;
4400
cfbf6377 4401 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4402 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4403
4404 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4405 if (ret)
73e39e4d 4406 goto out;
15228ede 4407
e8f59b59
ID
4408 /*
4409 * If there is a parent, see if it has disappeared due to the
4410 * mapped image getting flattened.
4411 */
4412 if (rbd_dev->parent) {
4413 ret = rbd_dev_v2_parent_info(rbd_dev);
4414 if (ret)
73e39e4d 4415 goto out;
e8f59b59
ID
4416 }
4417
5ff1108c 4418 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4419 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4420 } else {
4421 /* validate mapped snapshot's EXISTS flag */
4422 rbd_exists_validate(rbd_dev);
4423 }
15228ede 4424
73e39e4d 4425out:
cfbf6377 4426 up_write(&rbd_dev->header_rwsem);
73e39e4d 4427 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4428 rbd_dev_update_size(rbd_dev);
1fe5e993 4429
73e39e4d 4430 return ret;
1fe5e993
AE
4431}
4432
7ad18afa
CH
4433static int rbd_init_request(void *data, struct request *rq,
4434 unsigned int hctx_idx, unsigned int request_idx,
4435 unsigned int numa_node)
4436{
4437 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4438
4439 INIT_WORK(work, rbd_queue_workfn);
4440 return 0;
4441}
4442
4443static struct blk_mq_ops rbd_mq_ops = {
4444 .queue_rq = rbd_queue_rq,
4445 .map_queue = blk_mq_map_queue,
4446 .init_request = rbd_init_request,
4447};
4448
602adf40
YS
4449static int rbd_init_disk(struct rbd_device *rbd_dev)
4450{
4451 struct gendisk *disk;
4452 struct request_queue *q;
593a9e7b 4453 u64 segment_size;
7ad18afa 4454 int err;
602adf40 4455
602adf40 4456 /* create gendisk info */
7e513d43
ID
4457 disk = alloc_disk(single_major ?
4458 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4459 RBD_MINORS_PER_MAJOR);
602adf40 4460 if (!disk)
1fcdb8aa 4461 return -ENOMEM;
602adf40 4462
f0f8cef5 4463 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4464 rbd_dev->dev_id);
602adf40 4465 disk->major = rbd_dev->major;
dd82fff1 4466 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4467 if (single_major)
4468 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4469 disk->fops = &rbd_bd_ops;
4470 disk->private_data = rbd_dev;
4471
7ad18afa
CH
4472 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4473 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4474 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4475 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4476 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4477 rbd_dev->tag_set.nr_hw_queues = 1;
4478 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4479
4480 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4481 if (err)
602adf40 4482 goto out_disk;
029bcbd8 4483
7ad18afa
CH
4484 q = blk_mq_init_queue(&rbd_dev->tag_set);
4485 if (IS_ERR(q)) {
4486 err = PTR_ERR(q);
4487 goto out_tag_set;
4488 }
4489
d8a2c89c
ID
4490 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4491 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4492
029bcbd8 4493 /* set io sizes to object size */
593a9e7b
AE
4494 segment_size = rbd_obj_bytes(&rbd_dev->header);
4495 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4496 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4497 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4498 blk_queue_max_segment_size(q, segment_size);
4499 blk_queue_io_min(q, segment_size);
4500 blk_queue_io_opt(q, segment_size);
029bcbd8 4501
90e98c52
GZ
4502 /* enable the discard support */
4503 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4504 q->limits.discard_granularity = segment_size;
4505 q->limits.discard_alignment = segment_size;
2bb4cd5c 4506 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
b76f8239 4507 q->limits.discard_zeroes_data = 1;
90e98c52 4508
bae818ee
RH
4509 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4510 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4511
602adf40
YS
4512 disk->queue = q;
4513
4514 q->queuedata = rbd_dev;
4515
4516 rbd_dev->disk = disk;
602adf40 4517
602adf40 4518 return 0;
7ad18afa
CH
4519out_tag_set:
4520 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4521out_disk:
4522 put_disk(disk);
7ad18afa 4523 return err;
602adf40
YS
4524}
4525
dfc5606d
YS
4526/*
4527 sysfs
4528*/
4529
593a9e7b
AE
4530static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4531{
4532 return container_of(dev, struct rbd_device, dev);
4533}
4534
dfc5606d
YS
4535static ssize_t rbd_size_show(struct device *dev,
4536 struct device_attribute *attr, char *buf)
4537{
593a9e7b 4538 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4539
fc71d833
AE
4540 return sprintf(buf, "%llu\n",
4541 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4542}
4543
34b13184
AE
4544/*
4545 * Note this shows the features for whatever's mapped, which is not
4546 * necessarily the base image.
4547 */
4548static ssize_t rbd_features_show(struct device *dev,
4549 struct device_attribute *attr, char *buf)
4550{
4551 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4552
4553 return sprintf(buf, "0x%016llx\n",
fc71d833 4554 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4555}
4556
dfc5606d
YS
4557static ssize_t rbd_major_show(struct device *dev,
4558 struct device_attribute *attr, char *buf)
4559{
593a9e7b 4560 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4561
fc71d833
AE
4562 if (rbd_dev->major)
4563 return sprintf(buf, "%d\n", rbd_dev->major);
4564
4565 return sprintf(buf, "(none)\n");
dd82fff1
ID
4566}
4567
4568static ssize_t rbd_minor_show(struct device *dev,
4569 struct device_attribute *attr, char *buf)
4570{
4571 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4572
dd82fff1 4573 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4574}
4575
005a07bf
ID
4576static ssize_t rbd_client_addr_show(struct device *dev,
4577 struct device_attribute *attr, char *buf)
4578{
4579 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4580 struct ceph_entity_addr *client_addr =
4581 ceph_client_addr(rbd_dev->rbd_client->client);
4582
4583 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4584 le32_to_cpu(client_addr->nonce));
4585}
4586
dfc5606d
YS
4587static ssize_t rbd_client_id_show(struct device *dev,
4588 struct device_attribute *attr, char *buf)
602adf40 4589{
593a9e7b 4590 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4591
1dbb4399 4592 return sprintf(buf, "client%lld\n",
033268a5 4593 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4594}
4595
267fb90b
MC
4596static ssize_t rbd_cluster_fsid_show(struct device *dev,
4597 struct device_attribute *attr, char *buf)
4598{
4599 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4600
4601 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4602}
4603
0d6d1e9c
MC
4604static ssize_t rbd_config_info_show(struct device *dev,
4605 struct device_attribute *attr, char *buf)
4606{
4607 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4608
4609 return sprintf(buf, "%s\n", rbd_dev->config_info);
4610}
4611
dfc5606d
YS
4612static ssize_t rbd_pool_show(struct device *dev,
4613 struct device_attribute *attr, char *buf)
602adf40 4614{
593a9e7b 4615 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4616
0d7dbfce 4617 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4618}
4619
9bb2f334
AE
4620static ssize_t rbd_pool_id_show(struct device *dev,
4621 struct device_attribute *attr, char *buf)
4622{
4623 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4624
0d7dbfce 4625 return sprintf(buf, "%llu\n",
fc71d833 4626 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4627}
4628
dfc5606d
YS
4629static ssize_t rbd_name_show(struct device *dev,
4630 struct device_attribute *attr, char *buf)
4631{
593a9e7b 4632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4633
a92ffdf8
AE
4634 if (rbd_dev->spec->image_name)
4635 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4636
4637 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4638}
4639
589d30e0
AE
4640static ssize_t rbd_image_id_show(struct device *dev,
4641 struct device_attribute *attr, char *buf)
4642{
4643 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4644
0d7dbfce 4645 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4646}
4647
34b13184
AE
4648/*
4649 * Shows the name of the currently-mapped snapshot (or
4650 * RBD_SNAP_HEAD_NAME for the base image).
4651 */
dfc5606d
YS
4652static ssize_t rbd_snap_show(struct device *dev,
4653 struct device_attribute *attr,
4654 char *buf)
4655{
593a9e7b 4656 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4657
0d7dbfce 4658 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4659}
4660
92a58671
MC
4661static ssize_t rbd_snap_id_show(struct device *dev,
4662 struct device_attribute *attr, char *buf)
4663{
4664 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4665
4666 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4667}
4668
86b00e0d 4669/*
ff96128f
ID
4670 * For a v2 image, shows the chain of parent images, separated by empty
4671 * lines. For v1 images or if there is no parent, shows "(no parent
4672 * image)".
86b00e0d
AE
4673 */
4674static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4675 struct device_attribute *attr,
4676 char *buf)
86b00e0d
AE
4677{
4678 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4679 ssize_t count = 0;
86b00e0d 4680
ff96128f 4681 if (!rbd_dev->parent)
86b00e0d
AE
4682 return sprintf(buf, "(no parent image)\n");
4683
ff96128f
ID
4684 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4685 struct rbd_spec *spec = rbd_dev->parent_spec;
4686
4687 count += sprintf(&buf[count], "%s"
4688 "pool_id %llu\npool_name %s\n"
4689 "image_id %s\nimage_name %s\n"
4690 "snap_id %llu\nsnap_name %s\n"
4691 "overlap %llu\n",
4692 !count ? "" : "\n", /* first? */
4693 spec->pool_id, spec->pool_name,
4694 spec->image_id, spec->image_name ?: "(unknown)",
4695 spec->snap_id, spec->snap_name,
4696 rbd_dev->parent_overlap);
4697 }
4698
4699 return count;
86b00e0d
AE
4700}
4701
dfc5606d
YS
4702static ssize_t rbd_image_refresh(struct device *dev,
4703 struct device_attribute *attr,
4704 const char *buf,
4705 size_t size)
4706{
593a9e7b 4707 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4708 int ret;
602adf40 4709
cc4a38bd 4710 ret = rbd_dev_refresh(rbd_dev);
e627db08 4711 if (ret)
52bb1f9b 4712 return ret;
b813623a 4713
52bb1f9b 4714 return size;
dfc5606d 4715}
602adf40 4716
dfc5606d 4717static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4718static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4719static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4720static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4721static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4722static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4723static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4724static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4725static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4726static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4727static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4728static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4729static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4730static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4731static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4732static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4733
4734static struct attribute *rbd_attrs[] = {
4735 &dev_attr_size.attr,
34b13184 4736 &dev_attr_features.attr,
dfc5606d 4737 &dev_attr_major.attr,
dd82fff1 4738 &dev_attr_minor.attr,
005a07bf 4739 &dev_attr_client_addr.attr,
dfc5606d 4740 &dev_attr_client_id.attr,
267fb90b 4741 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4742 &dev_attr_config_info.attr,
dfc5606d 4743 &dev_attr_pool.attr,
9bb2f334 4744 &dev_attr_pool_id.attr,
dfc5606d 4745 &dev_attr_name.attr,
589d30e0 4746 &dev_attr_image_id.attr,
dfc5606d 4747 &dev_attr_current_snap.attr,
92a58671 4748 &dev_attr_snap_id.attr,
86b00e0d 4749 &dev_attr_parent.attr,
dfc5606d 4750 &dev_attr_refresh.attr,
dfc5606d
YS
4751 NULL
4752};
4753
4754static struct attribute_group rbd_attr_group = {
4755 .attrs = rbd_attrs,
4756};
4757
4758static const struct attribute_group *rbd_attr_groups[] = {
4759 &rbd_attr_group,
4760 NULL
4761};
4762
6cac4695 4763static void rbd_dev_release(struct device *dev);
dfc5606d
YS
4764
4765static struct device_type rbd_device_type = {
4766 .name = "rbd",
4767 .groups = rbd_attr_groups,
6cac4695 4768 .release = rbd_dev_release,
dfc5606d
YS
4769};
4770
8b8fb99c
AE
4771static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4772{
4773 kref_get(&spec->kref);
4774
4775 return spec;
4776}
4777
4778static void rbd_spec_free(struct kref *kref);
4779static void rbd_spec_put(struct rbd_spec *spec)
4780{
4781 if (spec)
4782 kref_put(&spec->kref, rbd_spec_free);
4783}
4784
4785static struct rbd_spec *rbd_spec_alloc(void)
4786{
4787 struct rbd_spec *spec;
4788
4789 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4790 if (!spec)
4791 return NULL;
04077599
ID
4792
4793 spec->pool_id = CEPH_NOPOOL;
4794 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4795 kref_init(&spec->kref);
4796
8b8fb99c
AE
4797 return spec;
4798}
4799
4800static void rbd_spec_free(struct kref *kref)
4801{
4802 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4803
4804 kfree(spec->pool_name);
4805 kfree(spec->image_id);
4806 kfree(spec->image_name);
4807 kfree(spec->snap_name);
4808 kfree(spec);
4809}
4810
1643dfa4 4811static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4812{
99d16943 4813 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4814 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
99d16943 4815
c41d13a3 4816 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4817 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4818 kfree(rbd_dev->config_info);
c41d13a3 4819
dd5ac32d
ID
4820 rbd_put_client(rbd_dev->rbd_client);
4821 rbd_spec_put(rbd_dev->spec);
4822 kfree(rbd_dev->opts);
4823 kfree(rbd_dev);
1643dfa4
ID
4824}
4825
4826static void rbd_dev_release(struct device *dev)
4827{
4828 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4829 bool need_put = !!rbd_dev->opts;
4830
4831 if (need_put) {
4832 destroy_workqueue(rbd_dev->task_wq);
4833 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4834 }
4835
4836 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4837
4838 /*
4839 * This is racy, but way better than putting module outside of
4840 * the release callback. The race window is pretty small, so
4841 * doing something similar to dm (dm-builtin.c) is overkill.
4842 */
4843 if (need_put)
4844 module_put(THIS_MODULE);
4845}
4846
1643dfa4
ID
4847static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4848 struct rbd_spec *spec)
c53d5893
AE
4849{
4850 struct rbd_device *rbd_dev;
4851
1643dfa4 4852 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4853 if (!rbd_dev)
4854 return NULL;
4855
4856 spin_lock_init(&rbd_dev->lock);
4857 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4858 init_rwsem(&rbd_dev->header_rwsem);
4859
c41d13a3 4860 ceph_oid_init(&rbd_dev->header_oid);
922dab61 4861 ceph_oloc_init(&rbd_dev->header_oloc);
c41d13a3 4862
99d16943
ID
4863 mutex_init(&rbd_dev->watch_mutex);
4864 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4865 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4866
ed95b21a
ID
4867 init_rwsem(&rbd_dev->lock_rwsem);
4868 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4869 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4870 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4871 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4872 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4873 init_waitqueue_head(&rbd_dev->lock_waitq);
4874
dd5ac32d
ID
4875 rbd_dev->dev.bus = &rbd_bus_type;
4876 rbd_dev->dev.type = &rbd_device_type;
4877 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4878 device_initialize(&rbd_dev->dev);
4879
c53d5893 4880 rbd_dev->rbd_client = rbdc;
d147543d 4881 rbd_dev->spec = spec;
0903e875 4882
7627151e
YZ
4883 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4884 rbd_dev->layout.stripe_count = 1;
4885 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4886 rbd_dev->layout.pool_id = spec->pool_id;
30c156d9 4887 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
0903e875 4888
1643dfa4
ID
4889 return rbd_dev;
4890}
4891
4892/*
4893 * Create a mapping rbd_dev.
4894 */
4895static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4896 struct rbd_spec *spec,
4897 struct rbd_options *opts)
4898{
4899 struct rbd_device *rbd_dev;
4900
4901 rbd_dev = __rbd_dev_create(rbdc, spec);
4902 if (!rbd_dev)
4903 return NULL;
4904
4905 rbd_dev->opts = opts;
4906
4907 /* get an id and fill in device name */
4908 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4909 minor_to_rbd_dev_id(1 << MINORBITS),
4910 GFP_KERNEL);
4911 if (rbd_dev->dev_id < 0)
4912 goto fail_rbd_dev;
4913
4914 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4915 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4916 rbd_dev->name);
4917 if (!rbd_dev->task_wq)
4918 goto fail_dev_id;
dd5ac32d 4919
1643dfa4
ID
4920 /* we have a ref from do_rbd_add() */
4921 __module_get(THIS_MODULE);
4922
4923 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4924 return rbd_dev;
1643dfa4
ID
4925
4926fail_dev_id:
4927 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4928fail_rbd_dev:
4929 rbd_dev_free(rbd_dev);
4930 return NULL;
c53d5893
AE
4931}
4932
4933static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4934{
dd5ac32d
ID
4935 if (rbd_dev)
4936 put_device(&rbd_dev->dev);
c53d5893
AE
4937}
4938
9d475de5
AE
4939/*
4940 * Get the size and object order for an image snapshot, or if
4941 * snap_id is CEPH_NOSNAP, gets this information for the base
4942 * image.
4943 */
4944static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4945 u8 *order, u64 *snap_size)
4946{
4947 __le64 snapid = cpu_to_le64(snap_id);
4948 int ret;
4949 struct {
4950 u8 order;
4951 __le64 size;
4952 } __attribute__ ((packed)) size_buf = { 0 };
4953
c41d13a3 4954 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
9d475de5 4955 "rbd", "get_size",
4157976b 4956 &snapid, sizeof (snapid),
e2a58ee5 4957 &size_buf, sizeof (size_buf));
36be9a76 4958 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4959 if (ret < 0)
4960 return ret;
57385b51
AE
4961 if (ret < sizeof (size_buf))
4962 return -ERANGE;
9d475de5 4963
c3545579 4964 if (order) {
c86f86e9 4965 *order = size_buf.order;
c3545579
JD
4966 dout(" order %u", (unsigned int)*order);
4967 }
9d475de5
AE
4968 *snap_size = le64_to_cpu(size_buf.size);
4969
c3545579
JD
4970 dout(" snap_id 0x%016llx snap_size = %llu\n",
4971 (unsigned long long)snap_id,
57385b51 4972 (unsigned long long)*snap_size);
9d475de5
AE
4973
4974 return 0;
4975}
4976
4977static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4978{
4979 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4980 &rbd_dev->header.obj_order,
4981 &rbd_dev->header.image_size);
4982}
4983
1e130199
AE
4984static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4985{
4986 void *reply_buf;
4987 int ret;
4988 void *p;
4989
4990 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4991 if (!reply_buf)
4992 return -ENOMEM;
4993
c41d13a3 4994 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 4995 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 4996 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4997 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4998 if (ret < 0)
4999 goto out;
5000
5001 p = reply_buf;
5002 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
5003 p + ret, NULL, GFP_NOIO);
5004 ret = 0;
1e130199
AE
5005
5006 if (IS_ERR(rbd_dev->header.object_prefix)) {
5007 ret = PTR_ERR(rbd_dev->header.object_prefix);
5008 rbd_dev->header.object_prefix = NULL;
5009 } else {
5010 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5011 }
1e130199
AE
5012out:
5013 kfree(reply_buf);
5014
5015 return ret;
5016}
5017
b1b5402a
AE
5018static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5019 u64 *snap_features)
5020{
5021 __le64 snapid = cpu_to_le64(snap_id);
5022 struct {
5023 __le64 features;
5024 __le64 incompat;
4157976b 5025 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 5026 u64 unsup;
b1b5402a
AE
5027 int ret;
5028
c41d13a3 5029 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b1b5402a 5030 "rbd", "get_features",
4157976b 5031 &snapid, sizeof (snapid),
e2a58ee5 5032 &features_buf, sizeof (features_buf));
36be9a76 5033 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
5034 if (ret < 0)
5035 return ret;
57385b51
AE
5036 if (ret < sizeof (features_buf))
5037 return -ERANGE;
d889140c 5038
d3767f0f
ID
5039 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5040 if (unsup) {
5041 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5042 unsup);
b8f5c6ed 5043 return -ENXIO;
d3767f0f 5044 }
d889140c 5045
b1b5402a
AE
5046 *snap_features = le64_to_cpu(features_buf.features);
5047
5048 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
5049 (unsigned long long)snap_id,
5050 (unsigned long long)*snap_features,
5051 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
5052
5053 return 0;
5054}
5055
5056static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5057{
5058 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5059 &rbd_dev->header.features);
5060}
5061
86b00e0d
AE
5062static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5063{
5064 struct rbd_spec *parent_spec;
5065 size_t size;
5066 void *reply_buf = NULL;
5067 __le64 snapid;
5068 void *p;
5069 void *end;
642a2537 5070 u64 pool_id;
86b00e0d 5071 char *image_id;
3b5cf2a2 5072 u64 snap_id;
86b00e0d 5073 u64 overlap;
86b00e0d
AE
5074 int ret;
5075
5076 parent_spec = rbd_spec_alloc();
5077 if (!parent_spec)
5078 return -ENOMEM;
5079
5080 size = sizeof (__le64) + /* pool_id */
5081 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5082 sizeof (__le64) + /* snap_id */
5083 sizeof (__le64); /* overlap */
5084 reply_buf = kmalloc(size, GFP_KERNEL);
5085 if (!reply_buf) {
5086 ret = -ENOMEM;
5087 goto out_err;
5088 }
5089
4d9b67cd 5090 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
c41d13a3 5091 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
86b00e0d 5092 "rbd", "get_parent",
4157976b 5093 &snapid, sizeof (snapid),
e2a58ee5 5094 reply_buf, size);
36be9a76 5095 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5096 if (ret < 0)
5097 goto out_err;
5098
86b00e0d 5099 p = reply_buf;
57385b51
AE
5100 end = reply_buf + ret;
5101 ret = -ERANGE;
642a2537 5102 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5103 if (pool_id == CEPH_NOPOOL) {
5104 /*
5105 * Either the parent never existed, or we have
5106 * record of it but the image got flattened so it no
5107 * longer has a parent. When the parent of a
5108 * layered image disappears we immediately set the
5109 * overlap to 0. The effect of this is that all new
5110 * requests will be treated as if the image had no
5111 * parent.
5112 */
5113 if (rbd_dev->parent_overlap) {
5114 rbd_dev->parent_overlap = 0;
392a9dad
AE
5115 rbd_dev_parent_put(rbd_dev);
5116 pr_info("%s: clone image has been flattened\n",
5117 rbd_dev->disk->disk_name);
5118 }
5119
86b00e0d 5120 goto out; /* No parent? No problem. */
392a9dad 5121 }
86b00e0d 5122
0903e875
AE
5123 /* The ceph file layout needs to fit pool id in 32 bits */
5124
5125 ret = -EIO;
642a2537 5126 if (pool_id > (u64)U32_MAX) {
9584d508 5127 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5128 (unsigned long long)pool_id, U32_MAX);
57385b51 5129 goto out_err;
c0cd10db 5130 }
0903e875 5131
979ed480 5132 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5133 if (IS_ERR(image_id)) {
5134 ret = PTR_ERR(image_id);
5135 goto out_err;
5136 }
3b5cf2a2 5137 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5138 ceph_decode_64_safe(&p, end, overlap, out_err);
5139
3b5cf2a2
AE
5140 /*
5141 * The parent won't change (except when the clone is
5142 * flattened, already handled that). So we only need to
5143 * record the parent spec we have not already done so.
5144 */
5145 if (!rbd_dev->parent_spec) {
5146 parent_spec->pool_id = pool_id;
5147 parent_spec->image_id = image_id;
5148 parent_spec->snap_id = snap_id;
70cf49cf
AE
5149 rbd_dev->parent_spec = parent_spec;
5150 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5151 } else {
5152 kfree(image_id);
3b5cf2a2
AE
5153 }
5154
5155 /*
cf32bd9c
ID
5156 * We always update the parent overlap. If it's zero we issue
5157 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5158 */
3b5cf2a2 5159 if (!overlap) {
3b5cf2a2 5160 if (parent_spec) {
cf32bd9c
ID
5161 /* refresh, careful to warn just once */
5162 if (rbd_dev->parent_overlap)
5163 rbd_warn(rbd_dev,
5164 "clone now standalone (overlap became 0)");
3b5cf2a2 5165 } else {
cf32bd9c
ID
5166 /* initial probe */
5167 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5168 }
70cf49cf 5169 }
cf32bd9c
ID
5170 rbd_dev->parent_overlap = overlap;
5171
86b00e0d
AE
5172out:
5173 ret = 0;
5174out_err:
5175 kfree(reply_buf);
5176 rbd_spec_put(parent_spec);
5177
5178 return ret;
5179}
5180
cc070d59
AE
5181static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5182{
5183 struct {
5184 __le64 stripe_unit;
5185 __le64 stripe_count;
5186 } __attribute__ ((packed)) striping_info_buf = { 0 };
5187 size_t size = sizeof (striping_info_buf);
5188 void *p;
5189 u64 obj_size;
5190 u64 stripe_unit;
5191 u64 stripe_count;
5192 int ret;
5193
c41d13a3 5194 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
cc070d59 5195 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 5196 (char *)&striping_info_buf, size);
cc070d59
AE
5197 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5198 if (ret < 0)
5199 return ret;
5200 if (ret < size)
5201 return -ERANGE;
5202
5203 /*
5204 * We don't actually support the "fancy striping" feature
5205 * (STRIPINGV2) yet, but if the striping sizes are the
5206 * defaults the behavior is the same as before. So find
5207 * out, and only fail if the image has non-default values.
5208 */
5209 ret = -EINVAL;
5210 obj_size = (u64)1 << rbd_dev->header.obj_order;
5211 p = &striping_info_buf;
5212 stripe_unit = ceph_decode_64(&p);
5213 if (stripe_unit != obj_size) {
5214 rbd_warn(rbd_dev, "unsupported stripe unit "
5215 "(got %llu want %llu)",
5216 stripe_unit, obj_size);
5217 return -EINVAL;
5218 }
5219 stripe_count = ceph_decode_64(&p);
5220 if (stripe_count != 1) {
5221 rbd_warn(rbd_dev, "unsupported stripe count "
5222 "(got %llu want 1)", stripe_count);
5223 return -EINVAL;
5224 }
500d0c0f
AE
5225 rbd_dev->header.stripe_unit = stripe_unit;
5226 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5227
5228 return 0;
5229}
5230
9e15b77d
AE
5231static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5232{
5233 size_t image_id_size;
5234 char *image_id;
5235 void *p;
5236 void *end;
5237 size_t size;
5238 void *reply_buf = NULL;
5239 size_t len = 0;
5240 char *image_name = NULL;
5241 int ret;
5242
5243 rbd_assert(!rbd_dev->spec->image_name);
5244
69e7a02f
AE
5245 len = strlen(rbd_dev->spec->image_id);
5246 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5247 image_id = kmalloc(image_id_size, GFP_KERNEL);
5248 if (!image_id)
5249 return NULL;
5250
5251 p = image_id;
4157976b 5252 end = image_id + image_id_size;
57385b51 5253 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5254
5255 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5256 reply_buf = kmalloc(size, GFP_KERNEL);
5257 if (!reply_buf)
5258 goto out;
5259
36be9a76 5260 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
5261 "rbd", "dir_get_name",
5262 image_id, image_id_size,
e2a58ee5 5263 reply_buf, size);
9e15b77d
AE
5264 if (ret < 0)
5265 goto out;
5266 p = reply_buf;
f40eb349
AE
5267 end = reply_buf + ret;
5268
9e15b77d
AE
5269 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5270 if (IS_ERR(image_name))
5271 image_name = NULL;
5272 else
5273 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5274out:
5275 kfree(reply_buf);
5276 kfree(image_id);
5277
5278 return image_name;
5279}
5280
2ad3d716
AE
5281static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5282{
5283 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5284 const char *snap_name;
5285 u32 which = 0;
5286
5287 /* Skip over names until we find the one we are looking for */
5288
5289 snap_name = rbd_dev->header.snap_names;
5290 while (which < snapc->num_snaps) {
5291 if (!strcmp(name, snap_name))
5292 return snapc->snaps[which];
5293 snap_name += strlen(snap_name) + 1;
5294 which++;
5295 }
5296 return CEPH_NOSNAP;
5297}
5298
5299static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5300{
5301 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5302 u32 which;
5303 bool found = false;
5304 u64 snap_id;
5305
5306 for (which = 0; !found && which < snapc->num_snaps; which++) {
5307 const char *snap_name;
5308
5309 snap_id = snapc->snaps[which];
5310 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5311 if (IS_ERR(snap_name)) {
5312 /* ignore no-longer existing snapshots */
5313 if (PTR_ERR(snap_name) == -ENOENT)
5314 continue;
5315 else
5316 break;
5317 }
2ad3d716
AE
5318 found = !strcmp(name, snap_name);
5319 kfree(snap_name);
5320 }
5321 return found ? snap_id : CEPH_NOSNAP;
5322}
5323
5324/*
5325 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5326 * no snapshot by that name is found, or if an error occurs.
5327 */
5328static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5329{
5330 if (rbd_dev->image_format == 1)
5331 return rbd_v1_snap_id_by_name(rbd_dev, name);
5332
5333 return rbd_v2_snap_id_by_name(rbd_dev, name);
5334}
5335
9e15b77d 5336/*
04077599
ID
5337 * An image being mapped will have everything but the snap id.
5338 */
5339static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5340{
5341 struct rbd_spec *spec = rbd_dev->spec;
5342
5343 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5344 rbd_assert(spec->image_id && spec->image_name);
5345 rbd_assert(spec->snap_name);
5346
5347 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5348 u64 snap_id;
5349
5350 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5351 if (snap_id == CEPH_NOSNAP)
5352 return -ENOENT;
5353
5354 spec->snap_id = snap_id;
5355 } else {
5356 spec->snap_id = CEPH_NOSNAP;
5357 }
5358
5359 return 0;
5360}
5361
5362/*
5363 * A parent image will have all ids but none of the names.
e1d4213f 5364 *
04077599
ID
5365 * All names in an rbd spec are dynamically allocated. It's OK if we
5366 * can't figure out the name for an image id.
9e15b77d 5367 */
04077599 5368static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5369{
2e9f7f1c
AE
5370 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5371 struct rbd_spec *spec = rbd_dev->spec;
5372 const char *pool_name;
5373 const char *image_name;
5374 const char *snap_name;
9e15b77d
AE
5375 int ret;
5376
04077599
ID
5377 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5378 rbd_assert(spec->image_id);
5379 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5380
2e9f7f1c 5381 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5382
2e9f7f1c
AE
5383 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5384 if (!pool_name) {
5385 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5386 return -EIO;
5387 }
2e9f7f1c
AE
5388 pool_name = kstrdup(pool_name, GFP_KERNEL);
5389 if (!pool_name)
9e15b77d
AE
5390 return -ENOMEM;
5391
5392 /* Fetch the image name; tolerate failure here */
5393
2e9f7f1c
AE
5394 image_name = rbd_dev_image_name(rbd_dev);
5395 if (!image_name)
06ecc6cb 5396 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5397
04077599 5398 /* Fetch the snapshot name */
9e15b77d 5399
2e9f7f1c 5400 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5401 if (IS_ERR(snap_name)) {
5402 ret = PTR_ERR(snap_name);
9e15b77d 5403 goto out_err;
2e9f7f1c
AE
5404 }
5405
5406 spec->pool_name = pool_name;
5407 spec->image_name = image_name;
5408 spec->snap_name = snap_name;
9e15b77d
AE
5409
5410 return 0;
04077599 5411
9e15b77d 5412out_err:
2e9f7f1c
AE
5413 kfree(image_name);
5414 kfree(pool_name);
9e15b77d
AE
5415 return ret;
5416}
5417
cc4a38bd 5418static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5419{
5420 size_t size;
5421 int ret;
5422 void *reply_buf;
5423 void *p;
5424 void *end;
5425 u64 seq;
5426 u32 snap_count;
5427 struct ceph_snap_context *snapc;
5428 u32 i;
5429
5430 /*
5431 * We'll need room for the seq value (maximum snapshot id),
5432 * snapshot count, and array of that many snapshot ids.
5433 * For now we have a fixed upper limit on the number we're
5434 * prepared to receive.
5435 */
5436 size = sizeof (__le64) + sizeof (__le32) +
5437 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5438 reply_buf = kzalloc(size, GFP_KERNEL);
5439 if (!reply_buf)
5440 return -ENOMEM;
5441
c41d13a3 5442 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 5443 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 5444 reply_buf, size);
36be9a76 5445 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5446 if (ret < 0)
5447 goto out;
5448
35d489f9 5449 p = reply_buf;
57385b51
AE
5450 end = reply_buf + ret;
5451 ret = -ERANGE;
35d489f9
AE
5452 ceph_decode_64_safe(&p, end, seq, out);
5453 ceph_decode_32_safe(&p, end, snap_count, out);
5454
5455 /*
5456 * Make sure the reported number of snapshot ids wouldn't go
5457 * beyond the end of our buffer. But before checking that,
5458 * make sure the computed size of the snapshot context we
5459 * allocate is representable in a size_t.
5460 */
5461 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5462 / sizeof (u64)) {
5463 ret = -EINVAL;
5464 goto out;
5465 }
5466 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5467 goto out;
468521c1 5468 ret = 0;
35d489f9 5469
812164f8 5470 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5471 if (!snapc) {
5472 ret = -ENOMEM;
5473 goto out;
5474 }
35d489f9 5475 snapc->seq = seq;
35d489f9
AE
5476 for (i = 0; i < snap_count; i++)
5477 snapc->snaps[i] = ceph_decode_64(&p);
5478
49ece554 5479 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5480 rbd_dev->header.snapc = snapc;
5481
5482 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5483 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5484out:
5485 kfree(reply_buf);
5486
57385b51 5487 return ret;
35d489f9
AE
5488}
5489
54cac61f
AE
5490static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5491 u64 snap_id)
b8b1e2db
AE
5492{
5493 size_t size;
5494 void *reply_buf;
54cac61f 5495 __le64 snapid;
b8b1e2db
AE
5496 int ret;
5497 void *p;
5498 void *end;
b8b1e2db
AE
5499 char *snap_name;
5500
5501 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5502 reply_buf = kmalloc(size, GFP_KERNEL);
5503 if (!reply_buf)
5504 return ERR_PTR(-ENOMEM);
5505
54cac61f 5506 snapid = cpu_to_le64(snap_id);
c41d13a3 5507 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b8b1e2db 5508 "rbd", "get_snapshot_name",
54cac61f 5509 &snapid, sizeof (snapid),
e2a58ee5 5510 reply_buf, size);
36be9a76 5511 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5512 if (ret < 0) {
5513 snap_name = ERR_PTR(ret);
b8b1e2db 5514 goto out;
f40eb349 5515 }
b8b1e2db
AE
5516
5517 p = reply_buf;
f40eb349 5518 end = reply_buf + ret;
e5c35534 5519 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5520 if (IS_ERR(snap_name))
b8b1e2db 5521 goto out;
b8b1e2db 5522
f40eb349 5523 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5524 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5525out:
5526 kfree(reply_buf);
5527
f40eb349 5528 return snap_name;
b8b1e2db
AE
5529}
5530
2df3fac7 5531static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5532{
2df3fac7 5533 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5534 int ret;
117973fb 5535
1617e40c
JD
5536 ret = rbd_dev_v2_image_size(rbd_dev);
5537 if (ret)
cfbf6377 5538 return ret;
1617e40c 5539
2df3fac7
AE
5540 if (first_time) {
5541 ret = rbd_dev_v2_header_onetime(rbd_dev);
5542 if (ret)
cfbf6377 5543 return ret;
2df3fac7
AE
5544 }
5545
cc4a38bd 5546 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5547 if (ret && first_time) {
5548 kfree(rbd_dev->header.object_prefix);
5549 rbd_dev->header.object_prefix = NULL;
5550 }
117973fb
AE
5551
5552 return ret;
5553}
5554
a720ae09
ID
5555static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5556{
5557 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5558
5559 if (rbd_dev->image_format == 1)
5560 return rbd_dev_v1_header_info(rbd_dev);
5561
5562 return rbd_dev_v2_header_info(rbd_dev);
5563}
5564
e28fff26
AE
5565/*
5566 * Skips over white space at *buf, and updates *buf to point to the
5567 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5568 * the token (string of non-white space characters) found. Note
5569 * that *buf must be terminated with '\0'.
e28fff26
AE
5570 */
5571static inline size_t next_token(const char **buf)
5572{
5573 /*
5574 * These are the characters that produce nonzero for
5575 * isspace() in the "C" and "POSIX" locales.
5576 */
5577 const char *spaces = " \f\n\r\t\v";
5578
5579 *buf += strspn(*buf, spaces); /* Find start of token */
5580
5581 return strcspn(*buf, spaces); /* Return token length */
5582}
5583
ea3352f4
AE
5584/*
5585 * Finds the next token in *buf, dynamically allocates a buffer big
5586 * enough to hold a copy of it, and copies the token into the new
5587 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5588 * that a duplicate buffer is created even for a zero-length token.
5589 *
5590 * Returns a pointer to the newly-allocated duplicate, or a null
5591 * pointer if memory for the duplicate was not available. If
5592 * the lenp argument is a non-null pointer, the length of the token
5593 * (not including the '\0') is returned in *lenp.
5594 *
5595 * If successful, the *buf pointer will be updated to point beyond
5596 * the end of the found token.
5597 *
5598 * Note: uses GFP_KERNEL for allocation.
5599 */
5600static inline char *dup_token(const char **buf, size_t *lenp)
5601{
5602 char *dup;
5603 size_t len;
5604
5605 len = next_token(buf);
4caf35f9 5606 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5607 if (!dup)
5608 return NULL;
ea3352f4
AE
5609 *(dup + len) = '\0';
5610 *buf += len;
5611
5612 if (lenp)
5613 *lenp = len;
5614
5615 return dup;
5616}
5617
a725f65e 5618/*
859c31df
AE
5619 * Parse the options provided for an "rbd add" (i.e., rbd image
5620 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5621 * and the data written is passed here via a NUL-terminated buffer.
5622 * Returns 0 if successful or an error code otherwise.
d22f76e7 5623 *
859c31df
AE
5624 * The information extracted from these options is recorded in
5625 * the other parameters which return dynamically-allocated
5626 * structures:
5627 * ceph_opts
5628 * The address of a pointer that will refer to a ceph options
5629 * structure. Caller must release the returned pointer using
5630 * ceph_destroy_options() when it is no longer needed.
5631 * rbd_opts
5632 * Address of an rbd options pointer. Fully initialized by
5633 * this function; caller must release with kfree().
5634 * spec
5635 * Address of an rbd image specification pointer. Fully
5636 * initialized by this function based on parsed options.
5637 * Caller must release with rbd_spec_put().
5638 *
5639 * The options passed take this form:
5640 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5641 * where:
5642 * <mon_addrs>
5643 * A comma-separated list of one or more monitor addresses.
5644 * A monitor address is an ip address, optionally followed
5645 * by a port number (separated by a colon).
5646 * I.e.: ip1[:port1][,ip2[:port2]...]
5647 * <options>
5648 * A comma-separated list of ceph and/or rbd options.
5649 * <pool_name>
5650 * The name of the rados pool containing the rbd image.
5651 * <image_name>
5652 * The name of the image in that pool to map.
5653 * <snap_id>
5654 * An optional snapshot id. If provided, the mapping will
5655 * present data from the image at the time that snapshot was
5656 * created. The image head is used if no snapshot id is
5657 * provided. Snapshot mappings are always read-only.
a725f65e 5658 */
859c31df 5659static int rbd_add_parse_args(const char *buf,
dc79b113 5660 struct ceph_options **ceph_opts,
859c31df
AE
5661 struct rbd_options **opts,
5662 struct rbd_spec **rbd_spec)
e28fff26 5663{
d22f76e7 5664 size_t len;
859c31df 5665 char *options;
0ddebc0c 5666 const char *mon_addrs;
ecb4dc22 5667 char *snap_name;
0ddebc0c 5668 size_t mon_addrs_size;
859c31df 5669 struct rbd_spec *spec = NULL;
4e9afeba 5670 struct rbd_options *rbd_opts = NULL;
859c31df 5671 struct ceph_options *copts;
dc79b113 5672 int ret;
e28fff26
AE
5673
5674 /* The first four tokens are required */
5675
7ef3214a 5676 len = next_token(&buf);
4fb5d671
AE
5677 if (!len) {
5678 rbd_warn(NULL, "no monitor address(es) provided");
5679 return -EINVAL;
5680 }
0ddebc0c 5681 mon_addrs = buf;
f28e565a 5682 mon_addrs_size = len + 1;
7ef3214a 5683 buf += len;
a725f65e 5684
dc79b113 5685 ret = -EINVAL;
f28e565a
AE
5686 options = dup_token(&buf, NULL);
5687 if (!options)
dc79b113 5688 return -ENOMEM;
4fb5d671
AE
5689 if (!*options) {
5690 rbd_warn(NULL, "no options provided");
5691 goto out_err;
5692 }
e28fff26 5693
859c31df
AE
5694 spec = rbd_spec_alloc();
5695 if (!spec)
f28e565a 5696 goto out_mem;
859c31df
AE
5697
5698 spec->pool_name = dup_token(&buf, NULL);
5699 if (!spec->pool_name)
5700 goto out_mem;
4fb5d671
AE
5701 if (!*spec->pool_name) {
5702 rbd_warn(NULL, "no pool name provided");
5703 goto out_err;
5704 }
e28fff26 5705
69e7a02f 5706 spec->image_name = dup_token(&buf, NULL);
859c31df 5707 if (!spec->image_name)
f28e565a 5708 goto out_mem;
4fb5d671
AE
5709 if (!*spec->image_name) {
5710 rbd_warn(NULL, "no image name provided");
5711 goto out_err;
5712 }
d4b125e9 5713
f28e565a
AE
5714 /*
5715 * Snapshot name is optional; default is to use "-"
5716 * (indicating the head/no snapshot).
5717 */
3feeb894 5718 len = next_token(&buf);
820a5f3e 5719 if (!len) {
3feeb894
AE
5720 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5721 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5722 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5723 ret = -ENAMETOOLONG;
f28e565a 5724 goto out_err;
849b4260 5725 }
ecb4dc22
AE
5726 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5727 if (!snap_name)
f28e565a 5728 goto out_mem;
ecb4dc22
AE
5729 *(snap_name + len) = '\0';
5730 spec->snap_name = snap_name;
e5c35534 5731
0ddebc0c 5732 /* Initialize all rbd options to the defaults */
e28fff26 5733
4e9afeba
AE
5734 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5735 if (!rbd_opts)
5736 goto out_mem;
5737
5738 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5739 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5740 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
d22f76e7 5741
859c31df 5742 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5743 mon_addrs + mon_addrs_size - 1,
4e9afeba 5744 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5745 if (IS_ERR(copts)) {
5746 ret = PTR_ERR(copts);
dc79b113
AE
5747 goto out_err;
5748 }
859c31df
AE
5749 kfree(options);
5750
5751 *ceph_opts = copts;
4e9afeba 5752 *opts = rbd_opts;
859c31df 5753 *rbd_spec = spec;
0ddebc0c 5754
dc79b113 5755 return 0;
f28e565a 5756out_mem:
dc79b113 5757 ret = -ENOMEM;
d22f76e7 5758out_err:
859c31df
AE
5759 kfree(rbd_opts);
5760 rbd_spec_put(spec);
f28e565a 5761 kfree(options);
d22f76e7 5762
dc79b113 5763 return ret;
a725f65e
AE
5764}
5765
30ba1f02
ID
5766/*
5767 * Return pool id (>= 0) or a negative error code.
5768 */
5769static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5770{
a319bf56 5771 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5772 u64 newest_epoch;
30ba1f02
ID
5773 int tries = 0;
5774 int ret;
5775
5776again:
5777 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5778 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5779 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5780 &newest_epoch);
30ba1f02
ID
5781 if (ret < 0)
5782 return ret;
5783
5784 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5785 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5786 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5787 newest_epoch,
5788 opts->mount_timeout);
30ba1f02
ID
5789 goto again;
5790 } else {
5791 /* the osdmap we have is new enough */
5792 return -ENOENT;
5793 }
5794 }
5795
5796 return ret;
5797}
5798
589d30e0
AE
5799/*
5800 * An rbd format 2 image has a unique identifier, distinct from the
5801 * name given to it by the user. Internally, that identifier is
5802 * what's used to specify the names of objects related to the image.
5803 *
5804 * A special "rbd id" object is used to map an rbd image name to its
5805 * id. If that object doesn't exist, then there is no v2 rbd image
5806 * with the supplied name.
5807 *
5808 * This function will record the given rbd_dev's image_id field if
5809 * it can be determined, and in that case will return 0. If any
5810 * errors occur a negative errno will be returned and the rbd_dev's
5811 * image_id field will be unchanged (and should be NULL).
5812 */
5813static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5814{
5815 int ret;
5816 size_t size;
5817 char *object_name;
5818 void *response;
c0fba368 5819 char *image_id;
2f82ee54 5820
2c0d0a10
AE
5821 /*
5822 * When probing a parent image, the image id is already
5823 * known (and the image name likely is not). There's no
c0fba368
AE
5824 * need to fetch the image id again in this case. We
5825 * do still need to set the image format though.
2c0d0a10 5826 */
c0fba368
AE
5827 if (rbd_dev->spec->image_id) {
5828 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5829
2c0d0a10 5830 return 0;
c0fba368 5831 }
2c0d0a10 5832
589d30e0
AE
5833 /*
5834 * First, see if the format 2 image id file exists, and if
5835 * so, get the image's persistent id from it.
5836 */
69e7a02f 5837 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
5838 object_name = kmalloc(size, GFP_NOIO);
5839 if (!object_name)
5840 return -ENOMEM;
0d7dbfce 5841 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
5842 dout("rbd id object name is %s\n", object_name);
5843
5844 /* Response will be an encoded string, which includes a length */
5845
5846 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5847 response = kzalloc(size, GFP_NOIO);
5848 if (!response) {
5849 ret = -ENOMEM;
5850 goto out;
5851 }
5852
c0fba368
AE
5853 /* If it doesn't exist we'll assume it's a format 1 image */
5854
36be9a76 5855 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 5856 "rbd", "get_id", NULL, 0,
e2a58ee5 5857 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5858 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5859 if (ret == -ENOENT) {
5860 image_id = kstrdup("", GFP_KERNEL);
5861 ret = image_id ? 0 : -ENOMEM;
5862 if (!ret)
5863 rbd_dev->image_format = 1;
7dd440c9 5864 } else if (ret >= 0) {
c0fba368
AE
5865 void *p = response;
5866
5867 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5868 NULL, GFP_NOIO);
461f758a 5869 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5870 if (!ret)
5871 rbd_dev->image_format = 2;
c0fba368
AE
5872 }
5873
5874 if (!ret) {
5875 rbd_dev->spec->image_id = image_id;
5876 dout("image_id is %s\n", image_id);
589d30e0
AE
5877 }
5878out:
5879 kfree(response);
5880 kfree(object_name);
5881
5882 return ret;
5883}
5884
3abef3b3
AE
5885/*
5886 * Undo whatever state changes are made by v1 or v2 header info
5887 * call.
5888 */
6fd48b3b
AE
5889static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5890{
5891 struct rbd_image_header *header;
5892
e69b8d41 5893 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5894
5895 /* Free dynamic fields from the header, then zero it out */
5896
5897 header = &rbd_dev->header;
812164f8 5898 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5899 kfree(header->snap_sizes);
5900 kfree(header->snap_names);
5901 kfree(header->object_prefix);
5902 memset(header, 0, sizeof (*header));
5903}
5904
2df3fac7 5905static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5906{
5907 int ret;
a30b71b9 5908
1e130199 5909 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5910 if (ret)
b1b5402a
AE
5911 goto out_err;
5912
2df3fac7
AE
5913 /*
5914 * Get the and check features for the image. Currently the
5915 * features are assumed to never change.
5916 */
b1b5402a 5917 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5918 if (ret)
9d475de5 5919 goto out_err;
35d489f9 5920
cc070d59
AE
5921 /* If the image supports fancy striping, get its parameters */
5922
5923 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5924 ret = rbd_dev_v2_striping_info(rbd_dev);
5925 if (ret < 0)
5926 goto out_err;
5927 }
2df3fac7 5928 /* No support for crypto and compression type format 2 images */
a30b71b9 5929
35152979 5930 return 0;
9d475de5 5931out_err:
642a2537 5932 rbd_dev->header.features = 0;
1e130199
AE
5933 kfree(rbd_dev->header.object_prefix);
5934 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
5935
5936 return ret;
a30b71b9
AE
5937}
5938
6d69bb53
ID
5939/*
5940 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5941 * rbd_dev_image_probe() recursion depth, which means it's also the
5942 * length of the already discovered part of the parent chain.
5943 */
5944static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5945{
2f82ee54 5946 struct rbd_device *parent = NULL;
124afba2
AE
5947 int ret;
5948
5949 if (!rbd_dev->parent_spec)
5950 return 0;
124afba2 5951
6d69bb53
ID
5952 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5953 pr_info("parent chain is too long (%d)\n", depth);
5954 ret = -EINVAL;
5955 goto out_err;
5956 }
5957
1643dfa4 5958 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5959 if (!parent) {
5960 ret = -ENOMEM;
124afba2 5961 goto out_err;
1f2c6651
ID
5962 }
5963
5964 /*
5965 * Images related by parent/child relationships always share
5966 * rbd_client and spec/parent_spec, so bump their refcounts.
5967 */
5968 __rbd_get_client(rbd_dev->rbd_client);
5969 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5970
6d69bb53 5971 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5972 if (ret < 0)
5973 goto out_err;
1f2c6651 5974
124afba2 5975 rbd_dev->parent = parent;
a2acd00e 5976 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5977 return 0;
1f2c6651 5978
124afba2 5979out_err:
1f2c6651 5980 rbd_dev_unparent(rbd_dev);
1761b229 5981 rbd_dev_destroy(parent);
124afba2
AE
5982 return ret;
5983}
5984
811c6688
ID
5985/*
5986 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5987 * upon return.
5988 */
200a6a8b 5989static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5990{
83a06263 5991 int ret;
d1cf5788 5992
9b60e70b 5993 /* Record our major and minor device numbers. */
83a06263 5994
9b60e70b
ID
5995 if (!single_major) {
5996 ret = register_blkdev(0, rbd_dev->name);
5997 if (ret < 0)
1643dfa4 5998 goto err_out_unlock;
9b60e70b
ID
5999
6000 rbd_dev->major = ret;
6001 rbd_dev->minor = 0;
6002 } else {
6003 rbd_dev->major = rbd_major;
6004 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6005 }
83a06263
AE
6006
6007 /* Set up the blkdev mapping. */
6008
6009 ret = rbd_init_disk(rbd_dev);
6010 if (ret)
6011 goto err_out_blkdev;
6012
f35a4dee 6013 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
6014 if (ret)
6015 goto err_out_disk;
bc1ecc65 6016
f35a4dee 6017 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 6018 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 6019
dd5ac32d
ID
6020 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6021 ret = device_add(&rbd_dev->dev);
f35a4dee 6022 if (ret)
f5ee37bd 6023 goto err_out_mapping;
83a06263 6024
83a06263
AE
6025 /* Everything's ready. Announce the disk to the world. */
6026
129b79d4 6027 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 6028 up_write(&rbd_dev->header_rwsem);
83a06263 6029
1643dfa4
ID
6030 spin_lock(&rbd_dev_list_lock);
6031 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6032 spin_unlock(&rbd_dev_list_lock);
6033
811c6688 6034 add_disk(rbd_dev->disk);
ca7909e8
ID
6035 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6036 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6037 rbd_dev->header.features);
83a06263
AE
6038
6039 return ret;
2f82ee54 6040
f35a4dee
AE
6041err_out_mapping:
6042 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
6043err_out_disk:
6044 rbd_free_disk(rbd_dev);
6045err_out_blkdev:
9b60e70b
ID
6046 if (!single_major)
6047 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
6048err_out_unlock:
6049 up_write(&rbd_dev->header_rwsem);
83a06263
AE
6050 return ret;
6051}
6052
332bb12d
AE
6053static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6054{
6055 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 6056 int ret;
332bb12d
AE
6057
6058 /* Record the header object name for this rbd image. */
6059
6060 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6061
7627151e 6062 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
332bb12d 6063 if (rbd_dev->image_format == 1)
c41d13a3
ID
6064 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6065 spec->image_name, RBD_SUFFIX);
332bb12d 6066 else
c41d13a3
ID
6067 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6068 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 6069
c41d13a3 6070 return ret;
332bb12d
AE
6071}
6072
200a6a8b
AE
6073static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6074{
6fd48b3b 6075 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
6076 rbd_dev->image_format = 0;
6077 kfree(rbd_dev->spec->image_id);
6078 rbd_dev->spec->image_id = NULL;
6079
200a6a8b
AE
6080 rbd_dev_destroy(rbd_dev);
6081}
6082
a30b71b9
AE
6083/*
6084 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6085 * device. If this image is the one being mapped (i.e., not a
6086 * parent), initiate a watch on its header object before using that
6087 * object to get detailed information about the rbd image.
a30b71b9 6088 */
6d69bb53 6089static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6090{
6091 int ret;
6092
6093 /*
3abef3b3
AE
6094 * Get the id from the image id object. Unless there's an
6095 * error, rbd_dev->spec->image_id will be filled in with
6096 * a dynamically-allocated string, and rbd_dev->image_format
6097 * will be set to either 1 or 2.
a30b71b9
AE
6098 */
6099 ret = rbd_dev_image_id(rbd_dev);
6100 if (ret)
c0fba368 6101 return ret;
c0fba368 6102
332bb12d
AE
6103 ret = rbd_dev_header_name(rbd_dev);
6104 if (ret)
6105 goto err_out_format;
6106
6d69bb53 6107 if (!depth) {
99d16943 6108 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6109 if (ret) {
6110 if (ret == -ENOENT)
6111 pr_info("image %s/%s does not exist\n",
6112 rbd_dev->spec->pool_name,
6113 rbd_dev->spec->image_name);
c41d13a3 6114 goto err_out_format;
1fe48023 6115 }
1f3ef788 6116 }
b644de2b 6117
a720ae09 6118 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6119 if (ret)
b644de2b 6120 goto err_out_watch;
83a06263 6121
04077599
ID
6122 /*
6123 * If this image is the one being mapped, we have pool name and
6124 * id, image name and id, and snap name - need to fill snap id.
6125 * Otherwise this is a parent image, identified by pool, image
6126 * and snap ids - need to fill in names for those ids.
6127 */
6d69bb53 6128 if (!depth)
04077599
ID
6129 ret = rbd_spec_fill_snap_id(rbd_dev);
6130 else
6131 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6132 if (ret) {
6133 if (ret == -ENOENT)
6134 pr_info("snap %s/%s@%s does not exist\n",
6135 rbd_dev->spec->pool_name,
6136 rbd_dev->spec->image_name,
6137 rbd_dev->spec->snap_name);
33dca39f 6138 goto err_out_probe;
1fe48023 6139 }
9bb81c9b 6140
e8f59b59
ID
6141 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6142 ret = rbd_dev_v2_parent_info(rbd_dev);
6143 if (ret)
6144 goto err_out_probe;
6145
6146 /*
6147 * Need to warn users if this image is the one being
6148 * mapped and has a parent.
6149 */
6d69bb53 6150 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6151 rbd_warn(rbd_dev,
6152 "WARNING: kernel layering is EXPERIMENTAL!");
6153 }
6154
6d69bb53 6155 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6156 if (ret)
6157 goto err_out_probe;
6158
6159 dout("discovered format %u image, header name is %s\n",
c41d13a3 6160 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6161 return 0;
e8f59b59 6162
6fd48b3b
AE
6163err_out_probe:
6164 rbd_dev_unprobe(rbd_dev);
b644de2b 6165err_out_watch:
6d69bb53 6166 if (!depth)
99d16943 6167 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6168err_out_format:
6169 rbd_dev->image_format = 0;
5655c4d9
AE
6170 kfree(rbd_dev->spec->image_id);
6171 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6172 return ret;
6173}
6174
9b60e70b
ID
6175static ssize_t do_rbd_add(struct bus_type *bus,
6176 const char *buf,
6177 size_t count)
602adf40 6178{
cb8627c7 6179 struct rbd_device *rbd_dev = NULL;
dc79b113 6180 struct ceph_options *ceph_opts = NULL;
4e9afeba 6181 struct rbd_options *rbd_opts = NULL;
859c31df 6182 struct rbd_spec *spec = NULL;
9d3997fd 6183 struct rbd_client *rbdc;
51344a38 6184 bool read_only;
b51c83c2 6185 int rc;
602adf40
YS
6186
6187 if (!try_module_get(THIS_MODULE))
6188 return -ENODEV;
6189
602adf40 6190 /* parse add command */
859c31df 6191 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6192 if (rc < 0)
dd5ac32d 6193 goto out;
78cea76e 6194
9d3997fd
AE
6195 rbdc = rbd_get_client(ceph_opts);
6196 if (IS_ERR(rbdc)) {
6197 rc = PTR_ERR(rbdc);
0ddebc0c 6198 goto err_out_args;
9d3997fd 6199 }
602adf40 6200
602adf40 6201 /* pick the pool */
30ba1f02 6202 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6203 if (rc < 0) {
6204 if (rc == -ENOENT)
6205 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6206 goto err_out_client;
1fe48023 6207 }
c0cd10db 6208 spec->pool_id = (u64)rc;
859c31df 6209
d147543d 6210 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6211 if (!rbd_dev) {
6212 rc = -ENOMEM;
bd4ba655 6213 goto err_out_client;
b51c83c2 6214 }
c53d5893
AE
6215 rbdc = NULL; /* rbd_dev now owns this */
6216 spec = NULL; /* rbd_dev now owns this */
d147543d 6217 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6218
0d6d1e9c
MC
6219 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6220 if (!rbd_dev->config_info) {
6221 rc = -ENOMEM;
6222 goto err_out_rbd_dev;
6223 }
6224
811c6688 6225 down_write(&rbd_dev->header_rwsem);
6d69bb53 6226 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
6227 if (rc < 0) {
6228 up_write(&rbd_dev->header_rwsem);
c53d5893 6229 goto err_out_rbd_dev;
0d6d1e9c 6230 }
05fd6f6f 6231
7ce4eef7
AE
6232 /* If we are mapping a snapshot it must be marked read-only */
6233
d147543d 6234 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6235 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6236 read_only = true;
6237 rbd_dev->mapping.read_only = read_only;
6238
b536f69a 6239 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 6240 if (rc) {
e37180c0 6241 /*
99d16943 6242 * rbd_unregister_watch() can't be moved into
e37180c0
ID
6243 * rbd_dev_image_release() without refactoring, see
6244 * commit 1f3ef78861ac.
6245 */
99d16943 6246 rbd_unregister_watch(rbd_dev);
3abef3b3 6247 rbd_dev_image_release(rbd_dev);
dd5ac32d 6248 goto out;
3abef3b3
AE
6249 }
6250
dd5ac32d
ID
6251 rc = count;
6252out:
6253 module_put(THIS_MODULE);
6254 return rc;
b536f69a 6255
c53d5893
AE
6256err_out_rbd_dev:
6257 rbd_dev_destroy(rbd_dev);
bd4ba655 6258err_out_client:
9d3997fd 6259 rbd_put_client(rbdc);
0ddebc0c 6260err_out_args:
859c31df 6261 rbd_spec_put(spec);
d147543d 6262 kfree(rbd_opts);
dd5ac32d 6263 goto out;
602adf40
YS
6264}
6265
9b60e70b
ID
6266static ssize_t rbd_add(struct bus_type *bus,
6267 const char *buf,
6268 size_t count)
6269{
6270 if (single_major)
6271 return -EINVAL;
6272
6273 return do_rbd_add(bus, buf, count);
6274}
6275
6276static ssize_t rbd_add_single_major(struct bus_type *bus,
6277 const char *buf,
6278 size_t count)
6279{
6280 return do_rbd_add(bus, buf, count);
6281}
6282
dd5ac32d 6283static void rbd_dev_device_release(struct rbd_device *rbd_dev)
602adf40 6284{
602adf40 6285 rbd_free_disk(rbd_dev);
1643dfa4
ID
6286
6287 spin_lock(&rbd_dev_list_lock);
6288 list_del_init(&rbd_dev->node);
6289 spin_unlock(&rbd_dev_list_lock);
6290
200a6a8b 6291 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
dd5ac32d 6292 device_del(&rbd_dev->dev);
6d80b130 6293 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
6294 if (!single_major)
6295 unregister_blkdev(rbd_dev->major, rbd_dev->name);
602adf40
YS
6296}
6297
05a46afd
AE
6298static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6299{
ad945fc1 6300 while (rbd_dev->parent) {
05a46afd
AE
6301 struct rbd_device *first = rbd_dev;
6302 struct rbd_device *second = first->parent;
6303 struct rbd_device *third;
6304
6305 /*
6306 * Follow to the parent with no grandparent and
6307 * remove it.
6308 */
6309 while (second && (third = second->parent)) {
6310 first = second;
6311 second = third;
6312 }
ad945fc1 6313 rbd_assert(second);
8ad42cd0 6314 rbd_dev_image_release(second);
ad945fc1
AE
6315 first->parent = NULL;
6316 first->parent_overlap = 0;
6317
6318 rbd_assert(first->parent_spec);
05a46afd
AE
6319 rbd_spec_put(first->parent_spec);
6320 first->parent_spec = NULL;
05a46afd
AE
6321 }
6322}
6323
9b60e70b
ID
6324static ssize_t do_rbd_remove(struct bus_type *bus,
6325 const char *buf,
6326 size_t count)
602adf40
YS
6327{
6328 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6329 struct list_head *tmp;
6330 int dev_id;
0276dca6 6331 char opt_buf[6];
82a442d2 6332 bool already = false;
0276dca6 6333 bool force = false;
0d8189e1 6334 int ret;
602adf40 6335
0276dca6
MC
6336 dev_id = -1;
6337 opt_buf[0] = '\0';
6338 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6339 if (dev_id < 0) {
6340 pr_err("dev_id out of range\n");
602adf40 6341 return -EINVAL;
0276dca6
MC
6342 }
6343 if (opt_buf[0] != '\0') {
6344 if (!strcmp(opt_buf, "force")) {
6345 force = true;
6346 } else {
6347 pr_err("bad remove option at '%s'\n", opt_buf);
6348 return -EINVAL;
6349 }
6350 }
602adf40 6351
751cc0e3
AE
6352 ret = -ENOENT;
6353 spin_lock(&rbd_dev_list_lock);
6354 list_for_each(tmp, &rbd_dev_list) {
6355 rbd_dev = list_entry(tmp, struct rbd_device, node);
6356 if (rbd_dev->dev_id == dev_id) {
6357 ret = 0;
6358 break;
6359 }
42382b70 6360 }
751cc0e3
AE
6361 if (!ret) {
6362 spin_lock_irq(&rbd_dev->lock);
0276dca6 6363 if (rbd_dev->open_count && !force)
751cc0e3
AE
6364 ret = -EBUSY;
6365 else
82a442d2
AE
6366 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6367 &rbd_dev->flags);
751cc0e3
AE
6368 spin_unlock_irq(&rbd_dev->lock);
6369 }
6370 spin_unlock(&rbd_dev_list_lock);
82a442d2 6371 if (ret < 0 || already)
1ba0f1e7 6372 return ret;
751cc0e3 6373
0276dca6
MC
6374 if (force) {
6375 /*
6376 * Prevent new IO from being queued and wait for existing
6377 * IO to complete/fail.
6378 */
6379 blk_mq_freeze_queue(rbd_dev->disk->queue);
6380 blk_set_queue_dying(rbd_dev->disk->queue);
6381 }
6382
ed95b21a
ID
6383 down_write(&rbd_dev->lock_rwsem);
6384 if (__rbd_is_lock_owner(rbd_dev))
6385 rbd_unlock(rbd_dev);
6386 up_write(&rbd_dev->lock_rwsem);
99d16943 6387 rbd_unregister_watch(rbd_dev);
fca27065 6388
9875201e
JD
6389 /*
6390 * Don't free anything from rbd_dev->disk until after all
6391 * notifies are completely processed. Otherwise
6392 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6393 * in a potential use after free of rbd_dev->disk or rbd_dev.
6394 */
dd5ac32d 6395 rbd_dev_device_release(rbd_dev);
8ad42cd0 6396 rbd_dev_image_release(rbd_dev);
aafb230e 6397
1ba0f1e7 6398 return count;
602adf40
YS
6399}
6400
9b60e70b
ID
6401static ssize_t rbd_remove(struct bus_type *bus,
6402 const char *buf,
6403 size_t count)
6404{
6405 if (single_major)
6406 return -EINVAL;
6407
6408 return do_rbd_remove(bus, buf, count);
6409}
6410
6411static ssize_t rbd_remove_single_major(struct bus_type *bus,
6412 const char *buf,
6413 size_t count)
6414{
6415 return do_rbd_remove(bus, buf, count);
6416}
6417
602adf40
YS
6418/*
6419 * create control files in sysfs
dfc5606d 6420 * /sys/bus/rbd/...
602adf40
YS
6421 */
6422static int rbd_sysfs_init(void)
6423{
dfc5606d 6424 int ret;
602adf40 6425
fed4c143 6426 ret = device_register(&rbd_root_dev);
21079786 6427 if (ret < 0)
dfc5606d 6428 return ret;
602adf40 6429
fed4c143
AE
6430 ret = bus_register(&rbd_bus_type);
6431 if (ret < 0)
6432 device_unregister(&rbd_root_dev);
602adf40 6433
602adf40
YS
6434 return ret;
6435}
6436
6437static void rbd_sysfs_cleanup(void)
6438{
dfc5606d 6439 bus_unregister(&rbd_bus_type);
fed4c143 6440 device_unregister(&rbd_root_dev);
602adf40
YS
6441}
6442
1c2a9dfe
AE
6443static int rbd_slab_init(void)
6444{
6445 rbd_assert(!rbd_img_request_cache);
03d94406 6446 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6447 if (!rbd_img_request_cache)
6448 return -ENOMEM;
6449
6450 rbd_assert(!rbd_obj_request_cache);
03d94406 6451 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6452 if (!rbd_obj_request_cache)
6453 goto out_err;
6454
6455 rbd_assert(!rbd_segment_name_cache);
6456 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 6457 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 6458 if (rbd_segment_name_cache)
1c2a9dfe 6459 return 0;
78c2a44a 6460out_err:
13bf2834
JL
6461 kmem_cache_destroy(rbd_obj_request_cache);
6462 rbd_obj_request_cache = NULL;
1c2a9dfe 6463
868311b1
AE
6464 kmem_cache_destroy(rbd_img_request_cache);
6465 rbd_img_request_cache = NULL;
6466
1c2a9dfe
AE
6467 return -ENOMEM;
6468}
6469
6470static void rbd_slab_exit(void)
6471{
78c2a44a
AE
6472 rbd_assert(rbd_segment_name_cache);
6473 kmem_cache_destroy(rbd_segment_name_cache);
6474 rbd_segment_name_cache = NULL;
6475
868311b1
AE
6476 rbd_assert(rbd_obj_request_cache);
6477 kmem_cache_destroy(rbd_obj_request_cache);
6478 rbd_obj_request_cache = NULL;
6479
1c2a9dfe
AE
6480 rbd_assert(rbd_img_request_cache);
6481 kmem_cache_destroy(rbd_img_request_cache);
6482 rbd_img_request_cache = NULL;
6483}
6484
cc344fa1 6485static int __init rbd_init(void)
602adf40
YS
6486{
6487 int rc;
6488
1e32d34c
AE
6489 if (!libceph_compatible(NULL)) {
6490 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6491 return -EINVAL;
6492 }
e1b4d96d 6493
1c2a9dfe 6494 rc = rbd_slab_init();
602adf40
YS
6495 if (rc)
6496 return rc;
e1b4d96d 6497
f5ee37bd
ID
6498 /*
6499 * The number of active work items is limited by the number of
f77303bd 6500 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6501 */
6502 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6503 if (!rbd_wq) {
6504 rc = -ENOMEM;
6505 goto err_out_slab;
6506 }
6507
9b60e70b
ID
6508 if (single_major) {
6509 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6510 if (rbd_major < 0) {
6511 rc = rbd_major;
f5ee37bd 6512 goto err_out_wq;
9b60e70b
ID
6513 }
6514 }
6515
1c2a9dfe
AE
6516 rc = rbd_sysfs_init();
6517 if (rc)
9b60e70b
ID
6518 goto err_out_blkdev;
6519
6520 if (single_major)
6521 pr_info("loaded (major %d)\n", rbd_major);
6522 else
6523 pr_info("loaded\n");
1c2a9dfe 6524
e1b4d96d
ID
6525 return 0;
6526
9b60e70b
ID
6527err_out_blkdev:
6528 if (single_major)
6529 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6530err_out_wq:
6531 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6532err_out_slab:
6533 rbd_slab_exit();
1c2a9dfe 6534 return rc;
602adf40
YS
6535}
6536
cc344fa1 6537static void __exit rbd_exit(void)
602adf40 6538{
ffe312cf 6539 ida_destroy(&rbd_dev_id_ida);
602adf40 6540 rbd_sysfs_cleanup();
9b60e70b
ID
6541 if (single_major)
6542 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6543 destroy_workqueue(rbd_wq);
1c2a9dfe 6544 rbd_slab_exit();
602adf40
YS
6545}
6546
6547module_init(rbd_init);
6548module_exit(rbd_exit);
6549
d552c619 6550MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6551MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6552MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6553/* following authorship retained from original osdblk.c */
6554MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6555
90da258b 6556MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6557MODULE_LICENSE("GPL");