]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: clean up asserts in rbd_img_obj_request_submit() helpers
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
5cbf6f12
AE
123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1)
ed95b21a
ID
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK)
d889140c
AE
129
130/* Features supported by this (client software) implementation. */
131
770eba6e 132#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 133
81a89793
AE
134/*
135 * An RBD device name will be "rbd#", where the "rbd" comes from
136 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 137 */
602adf40
YS
138#define DEV_NAME_LEN 32
139
140/*
141 * block device image metadata (in-memory version)
142 */
143struct rbd_image_header {
f35a4dee 144 /* These six fields never change for a given rbd image */
849b4260 145 char *object_prefix;
602adf40
YS
146 __u8 obj_order;
147 __u8 crypt_type;
148 __u8 comp_type;
f35a4dee
AE
149 u64 stripe_unit;
150 u64 stripe_count;
151 u64 features; /* Might be changeable someday? */
602adf40 152
f84344f3
AE
153 /* The remaining fields need to be updated occasionally */
154 u64 image_size;
155 struct ceph_snap_context *snapc;
f35a4dee
AE
156 char *snap_names; /* format 1 only */
157 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
158};
159
0d7dbfce
AE
160/*
161 * An rbd image specification.
162 *
163 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
164 * identify an image. Each rbd_dev structure includes a pointer to
165 * an rbd_spec structure that encapsulates this identity.
166 *
167 * Each of the id's in an rbd_spec has an associated name. For a
168 * user-mapped image, the names are supplied and the id's associated
169 * with them are looked up. For a layered image, a parent image is
170 * defined by the tuple, and the names are looked up.
171 *
172 * An rbd_dev structure contains a parent_spec pointer which is
173 * non-null if the image it represents is a child in a layered
174 * image. This pointer will refer to the rbd_spec structure used
175 * by the parent rbd_dev for its own identity (i.e., the structure
176 * is shared between the parent and child).
177 *
178 * Since these structures are populated once, during the discovery
179 * phase of image construction, they are effectively immutable so
180 * we make no effort to synchronize access to them.
181 *
182 * Note that code herein does not assume the image name is known (it
183 * could be a null pointer).
0d7dbfce
AE
184 */
185struct rbd_spec {
186 u64 pool_id;
ecb4dc22 187 const char *pool_name;
0d7dbfce 188
ecb4dc22
AE
189 const char *image_id;
190 const char *image_name;
0d7dbfce
AE
191
192 u64 snap_id;
ecb4dc22 193 const char *snap_name;
0d7dbfce
AE
194
195 struct kref kref;
196};
197
602adf40 198/*
f0f8cef5 199 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
200 */
201struct rbd_client {
202 struct ceph_client *client;
203 struct kref kref;
204 struct list_head node;
205};
206
bf0d5f50
AE
207struct rbd_img_request;
208typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
209
210#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
211
212struct rbd_obj_request;
213typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
214
9969ebc5
AE
215enum obj_request_type {
216 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
217};
bf0d5f50 218
6d2940c8
GZ
219enum obj_operation_type {
220 OBJ_OP_WRITE,
221 OBJ_OP_READ,
90e98c52 222 OBJ_OP_DISCARD,
6d2940c8
GZ
223};
224
926f9b3f
AE
225enum obj_req_flags {
226 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 227 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
228 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
229 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
230};
231
bf0d5f50
AE
232struct rbd_obj_request {
233 const char *object_name;
234 u64 offset; /* object start byte */
235 u64 length; /* bytes from offset */
926f9b3f 236 unsigned long flags;
bf0d5f50 237
c5b5ef6c
AE
238 /*
239 * An object request associated with an image will have its
240 * img_data flag set; a standalone object request will not.
241 *
242 * A standalone object request will have which == BAD_WHICH
243 * and a null obj_request pointer.
244 *
245 * An object request initiated in support of a layered image
246 * object (to check for its existence before a write) will
247 * have which == BAD_WHICH and a non-null obj_request pointer.
248 *
249 * Finally, an object request for rbd image data will have
250 * which != BAD_WHICH, and will have a non-null img_request
251 * pointer. The value of which will be in the range
252 * 0..(img_request->obj_request_count-1).
253 */
254 union {
255 struct rbd_obj_request *obj_request; /* STAT op */
256 struct {
257 struct rbd_img_request *img_request;
258 u64 img_offset;
259 /* links for img_request->obj_requests list */
260 struct list_head links;
261 };
262 };
bf0d5f50
AE
263 u32 which; /* posn image request list */
264
265 enum obj_request_type type;
788e2df3
AE
266 union {
267 struct bio *bio_list;
268 struct {
269 struct page **pages;
270 u32 page_count;
271 };
272 };
0eefd470 273 struct page **copyup_pages;
ebda6408 274 u32 copyup_page_count;
bf0d5f50
AE
275
276 struct ceph_osd_request *osd_req;
277
278 u64 xferred; /* bytes transferred */
1b83bef2 279 int result;
bf0d5f50
AE
280
281 rbd_obj_callback_t callback;
788e2df3 282 struct completion completion;
bf0d5f50
AE
283
284 struct kref kref;
285};
286
0c425248 287enum img_req_flags {
9849e986
AE
288 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
289 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 290 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 291 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
292};
293
bf0d5f50 294struct rbd_img_request {
bf0d5f50
AE
295 struct rbd_device *rbd_dev;
296 u64 offset; /* starting image byte offset */
297 u64 length; /* byte count from offset */
0c425248 298 unsigned long flags;
bf0d5f50 299 union {
9849e986 300 u64 snap_id; /* for reads */
bf0d5f50 301 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
302 };
303 union {
304 struct request *rq; /* block request */
305 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 306 };
3d7efd18 307 struct page **copyup_pages;
ebda6408 308 u32 copyup_page_count;
bf0d5f50
AE
309 spinlock_t completion_lock;/* protects next_completion */
310 u32 next_completion;
311 rbd_img_callback_t callback;
55f27e09 312 u64 xferred;/* aggregate bytes transferred */
a5a337d4 313 int result; /* first nonzero obj_request result */
bf0d5f50
AE
314
315 u32 obj_request_count;
316 struct list_head obj_requests; /* rbd_obj_request structs */
317
318 struct kref kref;
319};
320
321#define for_each_obj_request(ireq, oreq) \
ef06f4d3 322 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 323#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 324 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 325#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 326 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 327
99d16943
ID
328enum rbd_watch_state {
329 RBD_WATCH_STATE_UNREGISTERED,
330 RBD_WATCH_STATE_REGISTERED,
331 RBD_WATCH_STATE_ERROR,
332};
333
ed95b21a
ID
334enum rbd_lock_state {
335 RBD_LOCK_STATE_UNLOCKED,
336 RBD_LOCK_STATE_LOCKED,
337 RBD_LOCK_STATE_RELEASING,
338};
339
340/* WatchNotify::ClientId */
341struct rbd_client_id {
342 u64 gid;
343 u64 handle;
344};
345
f84344f3 346struct rbd_mapping {
99c1f08f 347 u64 size;
34b13184 348 u64 features;
f84344f3
AE
349 bool read_only;
350};
351
602adf40
YS
352/*
353 * a single device
354 */
355struct rbd_device {
de71a297 356 int dev_id; /* blkdev unique id */
602adf40
YS
357
358 int major; /* blkdev assigned major */
dd82fff1 359 int minor;
602adf40 360 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 361
a30b71b9 362 u32 image_format; /* Either 1 or 2 */
602adf40
YS
363 struct rbd_client *rbd_client;
364
365 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
366
b82d167b 367 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
368
369 struct rbd_image_header header;
b82d167b 370 unsigned long flags; /* possibly lock protected */
0d7dbfce 371 struct rbd_spec *spec;
d147543d 372 struct rbd_options *opts;
0d6d1e9c 373 char *config_info; /* add{,_single_major} string */
602adf40 374
c41d13a3 375 struct ceph_object_id header_oid;
922dab61 376 struct ceph_object_locator header_oloc;
971f839a 377
1643dfa4 378 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 379
99d16943
ID
380 struct mutex watch_mutex;
381 enum rbd_watch_state watch_state;
922dab61 382 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
383 u64 watch_cookie;
384 struct delayed_work watch_dwork;
59c2be1e 385
ed95b21a
ID
386 struct rw_semaphore lock_rwsem;
387 enum rbd_lock_state lock_state;
388 struct rbd_client_id owner_cid;
389 struct work_struct acquired_lock_work;
390 struct work_struct released_lock_work;
391 struct delayed_work lock_dwork;
392 struct work_struct unlock_work;
393 wait_queue_head_t lock_waitq;
394
1643dfa4
ID
395 struct workqueue_struct *task_wq;
396
86b00e0d
AE
397 struct rbd_spec *parent_spec;
398 u64 parent_overlap;
a2acd00e 399 atomic_t parent_ref;
2f82ee54 400 struct rbd_device *parent;
86b00e0d 401
7ad18afa
CH
402 /* Block layer tags. */
403 struct blk_mq_tag_set tag_set;
404
c666601a
JD
405 /* protects updating the header */
406 struct rw_semaphore header_rwsem;
f84344f3
AE
407
408 struct rbd_mapping mapping;
602adf40
YS
409
410 struct list_head node;
dfc5606d 411
dfc5606d
YS
412 /* sysfs related */
413 struct device dev;
b82d167b 414 unsigned long open_count; /* protected by lock */
dfc5606d
YS
415};
416
b82d167b
AE
417/*
418 * Flag bits for rbd_dev->flags. If atomicity is required,
419 * rbd_dev->lock is used to protect access.
420 *
421 * Currently, only the "removing" flag (which is coupled with the
422 * "open_count" field) requires atomic access.
423 */
6d292906
AE
424enum rbd_dev_flags {
425 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 426 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
427};
428
cfbf6377 429static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 430
602adf40 431static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
432static DEFINE_SPINLOCK(rbd_dev_list_lock);
433
432b8587
AE
434static LIST_HEAD(rbd_client_list); /* clients */
435static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 436
78c2a44a
AE
437/* Slab caches for frequently-allocated structures */
438
1c2a9dfe 439static struct kmem_cache *rbd_img_request_cache;
868311b1 440static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 441static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 442
9b60e70b 443static int rbd_major;
f8a22fc2
ID
444static DEFINE_IDA(rbd_dev_id_ida);
445
f5ee37bd
ID
446static struct workqueue_struct *rbd_wq;
447
9b60e70b
ID
448/*
449 * Default to false for now, as single-major requires >= 0.75 version of
450 * userspace rbd utility.
451 */
452static bool single_major = false;
453module_param(single_major, bool, S_IRUGO);
454MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
455
3d7efd18
AE
456static int rbd_img_request_submit(struct rbd_img_request *img_request);
457
f0f8cef5
AE
458static ssize_t rbd_add(struct bus_type *bus, const char *buf,
459 size_t count);
460static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
461 size_t count);
9b60e70b
ID
462static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
463 size_t count);
464static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
465 size_t count);
6d69bb53 466static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 467static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 468
9b60e70b
ID
469static int rbd_dev_id_to_minor(int dev_id)
470{
7e513d43 471 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
472}
473
474static int minor_to_rbd_dev_id(int minor)
475{
7e513d43 476 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
477}
478
ed95b21a
ID
479static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
480{
481 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
482 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
483 !rbd_dev->mapping.read_only;
484}
485
486static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
487{
488 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
489 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
490}
491
492static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
493{
494 bool is_lock_owner;
495
496 down_read(&rbd_dev->lock_rwsem);
497 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
498 up_read(&rbd_dev->lock_rwsem);
499 return is_lock_owner;
500}
501
b15a21dd
GKH
502static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
503static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
504static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
505static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
506
507static struct attribute *rbd_bus_attrs[] = {
508 &bus_attr_add.attr,
509 &bus_attr_remove.attr,
9b60e70b
ID
510 &bus_attr_add_single_major.attr,
511 &bus_attr_remove_single_major.attr,
b15a21dd 512 NULL,
f0f8cef5 513};
92c76dc0
ID
514
515static umode_t rbd_bus_is_visible(struct kobject *kobj,
516 struct attribute *attr, int index)
517{
9b60e70b
ID
518 if (!single_major &&
519 (attr == &bus_attr_add_single_major.attr ||
520 attr == &bus_attr_remove_single_major.attr))
521 return 0;
522
92c76dc0
ID
523 return attr->mode;
524}
525
526static const struct attribute_group rbd_bus_group = {
527 .attrs = rbd_bus_attrs,
528 .is_visible = rbd_bus_is_visible,
529};
530__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
531
532static struct bus_type rbd_bus_type = {
533 .name = "rbd",
b15a21dd 534 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
535};
536
537static void rbd_root_dev_release(struct device *dev)
538{
539}
540
541static struct device rbd_root_dev = {
542 .init_name = "rbd",
543 .release = rbd_root_dev_release,
544};
545
06ecc6cb
AE
546static __printf(2, 3)
547void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
548{
549 struct va_format vaf;
550 va_list args;
551
552 va_start(args, fmt);
553 vaf.fmt = fmt;
554 vaf.va = &args;
555
556 if (!rbd_dev)
557 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
558 else if (rbd_dev->disk)
559 printk(KERN_WARNING "%s: %s: %pV\n",
560 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
561 else if (rbd_dev->spec && rbd_dev->spec->image_name)
562 printk(KERN_WARNING "%s: image %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
564 else if (rbd_dev->spec && rbd_dev->spec->image_id)
565 printk(KERN_WARNING "%s: id %s: %pV\n",
566 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
567 else /* punt */
568 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
569 RBD_DRV_NAME, rbd_dev, &vaf);
570 va_end(args);
571}
572
aafb230e
AE
573#ifdef RBD_DEBUG
574#define rbd_assert(expr) \
575 if (unlikely(!(expr))) { \
576 printk(KERN_ERR "\nAssertion failure in %s() " \
577 "at line %d:\n\n" \
578 "\trbd_assert(%s);\n\n", \
579 __func__, __LINE__, #expr); \
580 BUG(); \
581 }
582#else /* !RBD_DEBUG */
583# define rbd_assert(expr) ((void) 0)
584#endif /* !RBD_DEBUG */
dfc5606d 585
2761713d 586static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 587static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
588static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
589static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 590
cc4a38bd 591static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 592static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 593static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 594static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
595static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
596 u64 snap_id);
2ad3d716
AE
597static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
598 u8 *order, u64 *snap_size);
599static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
600 u64 *snap_features);
59c2be1e 601
602adf40
YS
602static int rbd_open(struct block_device *bdev, fmode_t mode)
603{
f0f8cef5 604 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 605 bool removing = false;
602adf40 606
f84344f3 607 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
608 return -EROFS;
609
a14ea269 610 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
611 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
612 removing = true;
613 else
614 rbd_dev->open_count++;
a14ea269 615 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
616 if (removing)
617 return -ENOENT;
618
c3e946ce 619 (void) get_device(&rbd_dev->dev);
340c7a2b 620
602adf40
YS
621 return 0;
622}
623
db2a144b 624static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
625{
626 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
627 unsigned long open_count_before;
628
a14ea269 629 spin_lock_irq(&rbd_dev->lock);
b82d167b 630 open_count_before = rbd_dev->open_count--;
a14ea269 631 spin_unlock_irq(&rbd_dev->lock);
b82d167b 632 rbd_assert(open_count_before > 0);
dfc5606d 633
c3e946ce 634 put_device(&rbd_dev->dev);
dfc5606d
YS
635}
636
131fd9f6
GZ
637static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
638{
77f33c03 639 int ret = 0;
131fd9f6
GZ
640 int val;
641 bool ro;
77f33c03 642 bool ro_changed = false;
131fd9f6 643
77f33c03 644 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
645 if (get_user(val, (int __user *)(arg)))
646 return -EFAULT;
647
648 ro = val ? true : false;
649 /* Snapshot doesn't allow to write*/
650 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
651 return -EROFS;
652
77f33c03
JD
653 spin_lock_irq(&rbd_dev->lock);
654 /* prevent others open this device */
655 if (rbd_dev->open_count > 1) {
656 ret = -EBUSY;
657 goto out;
658 }
659
131fd9f6
GZ
660 if (rbd_dev->mapping.read_only != ro) {
661 rbd_dev->mapping.read_only = ro;
77f33c03 662 ro_changed = true;
131fd9f6
GZ
663 }
664
77f33c03
JD
665out:
666 spin_unlock_irq(&rbd_dev->lock);
667 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
668 if (ret == 0 && ro_changed)
669 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
670
671 return ret;
131fd9f6
GZ
672}
673
674static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
675 unsigned int cmd, unsigned long arg)
676{
677 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
678 int ret = 0;
679
131fd9f6
GZ
680 switch (cmd) {
681 case BLKROSET:
682 ret = rbd_ioctl_set_ro(rbd_dev, arg);
683 break;
684 default:
685 ret = -ENOTTY;
686 }
687
131fd9f6
GZ
688 return ret;
689}
690
691#ifdef CONFIG_COMPAT
692static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
693 unsigned int cmd, unsigned long arg)
694{
695 return rbd_ioctl(bdev, mode, cmd, arg);
696}
697#endif /* CONFIG_COMPAT */
698
602adf40
YS
699static const struct block_device_operations rbd_bd_ops = {
700 .owner = THIS_MODULE,
701 .open = rbd_open,
dfc5606d 702 .release = rbd_release,
131fd9f6
GZ
703 .ioctl = rbd_ioctl,
704#ifdef CONFIG_COMPAT
705 .compat_ioctl = rbd_compat_ioctl,
706#endif
602adf40
YS
707};
708
709/*
7262cfca 710 * Initialize an rbd client instance. Success or not, this function
cfbf6377 711 * consumes ceph_opts. Caller holds client_mutex.
602adf40 712 */
f8c38929 713static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
714{
715 struct rbd_client *rbdc;
716 int ret = -ENOMEM;
717
37206ee5 718 dout("%s:\n", __func__);
602adf40
YS
719 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
720 if (!rbdc)
721 goto out_opt;
722
723 kref_init(&rbdc->kref);
724 INIT_LIST_HEAD(&rbdc->node);
725
43ae4701 726 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 727 if (IS_ERR(rbdc->client))
08f75463 728 goto out_rbdc;
43ae4701 729 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
730
731 ret = ceph_open_session(rbdc->client);
732 if (ret < 0)
08f75463 733 goto out_client;
602adf40 734
432b8587 735 spin_lock(&rbd_client_list_lock);
602adf40 736 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 737 spin_unlock(&rbd_client_list_lock);
602adf40 738
37206ee5 739 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 740
602adf40 741 return rbdc;
08f75463 742out_client:
602adf40 743 ceph_destroy_client(rbdc->client);
08f75463 744out_rbdc:
602adf40
YS
745 kfree(rbdc);
746out_opt:
43ae4701
AE
747 if (ceph_opts)
748 ceph_destroy_options(ceph_opts);
37206ee5
AE
749 dout("%s: error %d\n", __func__, ret);
750
28f259b7 751 return ERR_PTR(ret);
602adf40
YS
752}
753
2f82ee54
AE
754static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
755{
756 kref_get(&rbdc->kref);
757
758 return rbdc;
759}
760
602adf40 761/*
1f7ba331
AE
762 * Find a ceph client with specific addr and configuration. If
763 * found, bump its reference count.
602adf40 764 */
1f7ba331 765static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
766{
767 struct rbd_client *client_node;
1f7ba331 768 bool found = false;
602adf40 769
43ae4701 770 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
771 return NULL;
772
1f7ba331
AE
773 spin_lock(&rbd_client_list_lock);
774 list_for_each_entry(client_node, &rbd_client_list, node) {
775 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
776 __rbd_get_client(client_node);
777
1f7ba331
AE
778 found = true;
779 break;
780 }
781 }
782 spin_unlock(&rbd_client_list_lock);
783
784 return found ? client_node : NULL;
602adf40
YS
785}
786
59c2be1e 787/*
210c104c 788 * (Per device) rbd map options
59c2be1e
YS
789 */
790enum {
b5584180 791 Opt_queue_depth,
59c2be1e
YS
792 Opt_last_int,
793 /* int args above */
794 Opt_last_string,
795 /* string args above */
cc0538b6
AE
796 Opt_read_only,
797 Opt_read_write,
80de1912 798 Opt_lock_on_read,
210c104c 799 Opt_err
59c2be1e
YS
800};
801
43ae4701 802static match_table_t rbd_opts_tokens = {
b5584180 803 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
804 /* int args above */
805 /* string args above */
be466c1c 806 {Opt_read_only, "read_only"},
cc0538b6
AE
807 {Opt_read_only, "ro"}, /* Alternate spelling */
808 {Opt_read_write, "read_write"},
809 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 810 {Opt_lock_on_read, "lock_on_read"},
210c104c 811 {Opt_err, NULL}
59c2be1e
YS
812};
813
98571b5a 814struct rbd_options {
b5584180 815 int queue_depth;
98571b5a 816 bool read_only;
80de1912 817 bool lock_on_read;
98571b5a
AE
818};
819
b5584180 820#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 821#define RBD_READ_ONLY_DEFAULT false
80de1912 822#define RBD_LOCK_ON_READ_DEFAULT false
98571b5a 823
59c2be1e
YS
824static int parse_rbd_opts_token(char *c, void *private)
825{
43ae4701 826 struct rbd_options *rbd_opts = private;
59c2be1e
YS
827 substring_t argstr[MAX_OPT_ARGS];
828 int token, intval, ret;
829
43ae4701 830 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
831 if (token < Opt_last_int) {
832 ret = match_int(&argstr[0], &intval);
833 if (ret < 0) {
210c104c 834 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
835 return ret;
836 }
837 dout("got int token %d val %d\n", token, intval);
838 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 839 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
840 } else {
841 dout("got token %d\n", token);
842 }
843
844 switch (token) {
b5584180
ID
845 case Opt_queue_depth:
846 if (intval < 1) {
847 pr_err("queue_depth out of range\n");
848 return -EINVAL;
849 }
850 rbd_opts->queue_depth = intval;
851 break;
cc0538b6
AE
852 case Opt_read_only:
853 rbd_opts->read_only = true;
854 break;
855 case Opt_read_write:
856 rbd_opts->read_only = false;
857 break;
80de1912
ID
858 case Opt_lock_on_read:
859 rbd_opts->lock_on_read = true;
860 break;
59c2be1e 861 default:
210c104c
ID
862 /* libceph prints "bad option" msg */
863 return -EINVAL;
59c2be1e 864 }
210c104c 865
59c2be1e
YS
866 return 0;
867}
868
6d2940c8
GZ
869static char* obj_op_name(enum obj_operation_type op_type)
870{
871 switch (op_type) {
872 case OBJ_OP_READ:
873 return "read";
874 case OBJ_OP_WRITE:
875 return "write";
90e98c52
GZ
876 case OBJ_OP_DISCARD:
877 return "discard";
6d2940c8
GZ
878 default:
879 return "???";
880 }
881}
882
602adf40
YS
883/*
884 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
885 * not exist create it. Either way, ceph_opts is consumed by this
886 * function.
602adf40 887 */
9d3997fd 888static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 889{
f8c38929 890 struct rbd_client *rbdc;
59c2be1e 891
cfbf6377 892 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 893 rbdc = rbd_client_find(ceph_opts);
9d3997fd 894 if (rbdc) /* using an existing client */
43ae4701 895 ceph_destroy_options(ceph_opts);
9d3997fd 896 else
f8c38929 897 rbdc = rbd_client_create(ceph_opts);
cfbf6377 898 mutex_unlock(&client_mutex);
602adf40 899
9d3997fd 900 return rbdc;
602adf40
YS
901}
902
903/*
904 * Destroy ceph client
d23a4b3f 905 *
432b8587 906 * Caller must hold rbd_client_list_lock.
602adf40
YS
907 */
908static void rbd_client_release(struct kref *kref)
909{
910 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
911
37206ee5 912 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 913 spin_lock(&rbd_client_list_lock);
602adf40 914 list_del(&rbdc->node);
cd9d9f5d 915 spin_unlock(&rbd_client_list_lock);
602adf40
YS
916
917 ceph_destroy_client(rbdc->client);
918 kfree(rbdc);
919}
920
921/*
922 * Drop reference to ceph client node. If it's not referenced anymore, release
923 * it.
924 */
9d3997fd 925static void rbd_put_client(struct rbd_client *rbdc)
602adf40 926{
c53d5893
AE
927 if (rbdc)
928 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
929}
930
a30b71b9
AE
931static bool rbd_image_format_valid(u32 image_format)
932{
933 return image_format == 1 || image_format == 2;
934}
935
8e94af8e
AE
936static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
937{
103a150f
AE
938 size_t size;
939 u32 snap_count;
940
941 /* The header has to start with the magic rbd header text */
942 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
943 return false;
944
db2388b6
AE
945 /* The bio layer requires at least sector-sized I/O */
946
947 if (ondisk->options.order < SECTOR_SHIFT)
948 return false;
949
950 /* If we use u64 in a few spots we may be able to loosen this */
951
952 if (ondisk->options.order > 8 * sizeof (int) - 1)
953 return false;
954
103a150f
AE
955 /*
956 * The size of a snapshot header has to fit in a size_t, and
957 * that limits the number of snapshots.
958 */
959 snap_count = le32_to_cpu(ondisk->snap_count);
960 size = SIZE_MAX - sizeof (struct ceph_snap_context);
961 if (snap_count > size / sizeof (__le64))
962 return false;
963
964 /*
965 * Not only that, but the size of the entire the snapshot
966 * header must also be representable in a size_t.
967 */
968 size -= snap_count * sizeof (__le64);
969 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
970 return false;
971
972 return true;
8e94af8e
AE
973}
974
602adf40 975/*
bb23e37a
AE
976 * Fill an rbd image header with information from the given format 1
977 * on-disk header.
602adf40 978 */
662518b1 979static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 980 struct rbd_image_header_ondisk *ondisk)
602adf40 981{
662518b1 982 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
983 bool first_time = header->object_prefix == NULL;
984 struct ceph_snap_context *snapc;
985 char *object_prefix = NULL;
986 char *snap_names = NULL;
987 u64 *snap_sizes = NULL;
ccece235 988 u32 snap_count;
d2bb24e5 989 size_t size;
bb23e37a 990 int ret = -ENOMEM;
621901d6 991 u32 i;
602adf40 992
bb23e37a 993 /* Allocate this now to avoid having to handle failure below */
6a52325f 994
bb23e37a
AE
995 if (first_time) {
996 size_t len;
103a150f 997
bb23e37a
AE
998 len = strnlen(ondisk->object_prefix,
999 sizeof (ondisk->object_prefix));
1000 object_prefix = kmalloc(len + 1, GFP_KERNEL);
1001 if (!object_prefix)
1002 return -ENOMEM;
1003 memcpy(object_prefix, ondisk->object_prefix, len);
1004 object_prefix[len] = '\0';
1005 }
00f1f36f 1006
bb23e37a 1007 /* Allocate the snapshot context and fill it in */
00f1f36f 1008
bb23e37a
AE
1009 snap_count = le32_to_cpu(ondisk->snap_count);
1010 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1011 if (!snapc)
1012 goto out_err;
1013 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1014 if (snap_count) {
bb23e37a 1015 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1016 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1017
bb23e37a 1018 /* We'll keep a copy of the snapshot names... */
621901d6 1019
bb23e37a
AE
1020 if (snap_names_len > (u64)SIZE_MAX)
1021 goto out_2big;
1022 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1023 if (!snap_names)
6a52325f
AE
1024 goto out_err;
1025
bb23e37a 1026 /* ...as well as the array of their sizes. */
621901d6 1027
d2bb24e5 1028 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
1029 snap_sizes = kmalloc(size, GFP_KERNEL);
1030 if (!snap_sizes)
6a52325f 1031 goto out_err;
bb23e37a 1032
f785cc1d 1033 /*
bb23e37a
AE
1034 * Copy the names, and fill in each snapshot's id
1035 * and size.
1036 *
99a41ebc 1037 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1038 * ondisk buffer we're working with has
f785cc1d
AE
1039 * snap_names_len bytes beyond the end of the
1040 * snapshot id array, this memcpy() is safe.
1041 */
bb23e37a
AE
1042 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1043 snaps = ondisk->snaps;
1044 for (i = 0; i < snap_count; i++) {
1045 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1046 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1047 }
602adf40 1048 }
6a52325f 1049
bb23e37a 1050 /* We won't fail any more, fill in the header */
621901d6 1051
bb23e37a
AE
1052 if (first_time) {
1053 header->object_prefix = object_prefix;
1054 header->obj_order = ondisk->options.order;
1055 header->crypt_type = ondisk->options.crypt_type;
1056 header->comp_type = ondisk->options.comp_type;
1057 /* The rest aren't used for format 1 images */
1058 header->stripe_unit = 0;
1059 header->stripe_count = 0;
1060 header->features = 0;
602adf40 1061 } else {
662518b1
AE
1062 ceph_put_snap_context(header->snapc);
1063 kfree(header->snap_names);
1064 kfree(header->snap_sizes);
602adf40 1065 }
849b4260 1066
bb23e37a 1067 /* The remaining fields always get updated (when we refresh) */
621901d6 1068
f84344f3 1069 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1070 header->snapc = snapc;
1071 header->snap_names = snap_names;
1072 header->snap_sizes = snap_sizes;
468521c1 1073
602adf40 1074 return 0;
bb23e37a
AE
1075out_2big:
1076 ret = -EIO;
6a52325f 1077out_err:
bb23e37a
AE
1078 kfree(snap_sizes);
1079 kfree(snap_names);
1080 ceph_put_snap_context(snapc);
1081 kfree(object_prefix);
ccece235 1082
bb23e37a 1083 return ret;
602adf40
YS
1084}
1085
9682fc6d
AE
1086static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1087{
1088 const char *snap_name;
1089
1090 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1091
1092 /* Skip over names until we find the one we are looking for */
1093
1094 snap_name = rbd_dev->header.snap_names;
1095 while (which--)
1096 snap_name += strlen(snap_name) + 1;
1097
1098 return kstrdup(snap_name, GFP_KERNEL);
1099}
1100
30d1cff8
AE
1101/*
1102 * Snapshot id comparison function for use with qsort()/bsearch().
1103 * Note that result is for snapshots in *descending* order.
1104 */
1105static int snapid_compare_reverse(const void *s1, const void *s2)
1106{
1107 u64 snap_id1 = *(u64 *)s1;
1108 u64 snap_id2 = *(u64 *)s2;
1109
1110 if (snap_id1 < snap_id2)
1111 return 1;
1112 return snap_id1 == snap_id2 ? 0 : -1;
1113}
1114
1115/*
1116 * Search a snapshot context to see if the given snapshot id is
1117 * present.
1118 *
1119 * Returns the position of the snapshot id in the array if it's found,
1120 * or BAD_SNAP_INDEX otherwise.
1121 *
1122 * Note: The snapshot array is in kept sorted (by the osd) in
1123 * reverse order, highest snapshot id first.
1124 */
9682fc6d
AE
1125static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1126{
1127 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1128 u64 *found;
9682fc6d 1129
30d1cff8
AE
1130 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1131 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1132
30d1cff8 1133 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1134}
1135
2ad3d716
AE
1136static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1137 u64 snap_id)
9e15b77d 1138{
54cac61f 1139 u32 which;
da6a6b63 1140 const char *snap_name;
9e15b77d 1141
54cac61f
AE
1142 which = rbd_dev_snap_index(rbd_dev, snap_id);
1143 if (which == BAD_SNAP_INDEX)
da6a6b63 1144 return ERR_PTR(-ENOENT);
54cac61f 1145
da6a6b63
JD
1146 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1147 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1148}
1149
1150static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1151{
9e15b77d
AE
1152 if (snap_id == CEPH_NOSNAP)
1153 return RBD_SNAP_HEAD_NAME;
1154
54cac61f
AE
1155 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1156 if (rbd_dev->image_format == 1)
1157 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1158
54cac61f 1159 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1160}
1161
2ad3d716
AE
1162static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1163 u64 *snap_size)
602adf40 1164{
2ad3d716
AE
1165 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1166 if (snap_id == CEPH_NOSNAP) {
1167 *snap_size = rbd_dev->header.image_size;
1168 } else if (rbd_dev->image_format == 1) {
1169 u32 which;
602adf40 1170
2ad3d716
AE
1171 which = rbd_dev_snap_index(rbd_dev, snap_id);
1172 if (which == BAD_SNAP_INDEX)
1173 return -ENOENT;
e86924a8 1174
2ad3d716
AE
1175 *snap_size = rbd_dev->header.snap_sizes[which];
1176 } else {
1177 u64 size = 0;
1178 int ret;
1179
1180 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1181 if (ret)
1182 return ret;
1183
1184 *snap_size = size;
1185 }
1186 return 0;
602adf40
YS
1187}
1188
2ad3d716
AE
1189static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1190 u64 *snap_features)
602adf40 1191{
2ad3d716
AE
1192 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1193 if (snap_id == CEPH_NOSNAP) {
1194 *snap_features = rbd_dev->header.features;
1195 } else if (rbd_dev->image_format == 1) {
1196 *snap_features = 0; /* No features for format 1 */
602adf40 1197 } else {
2ad3d716
AE
1198 u64 features = 0;
1199 int ret;
8b0241f8 1200
2ad3d716
AE
1201 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1202 if (ret)
1203 return ret;
1204
1205 *snap_features = features;
1206 }
1207 return 0;
1208}
1209
1210static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1211{
8f4b7d98 1212 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1213 u64 size = 0;
1214 u64 features = 0;
1215 int ret;
1216
2ad3d716
AE
1217 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1218 if (ret)
1219 return ret;
1220 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1221 if (ret)
1222 return ret;
1223
1224 rbd_dev->mapping.size = size;
1225 rbd_dev->mapping.features = features;
1226
8b0241f8 1227 return 0;
602adf40
YS
1228}
1229
d1cf5788
AE
1230static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1231{
1232 rbd_dev->mapping.size = 0;
1233 rbd_dev->mapping.features = 0;
200a6a8b
AE
1234}
1235
7d5079aa
HS
1236static void rbd_segment_name_free(const char *name)
1237{
1238 /* The explicit cast here is needed to drop the const qualifier */
1239
1240 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1241}
1242
98571b5a 1243static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1244{
65ccfe21
AE
1245 char *name;
1246 u64 segment;
1247 int ret;
3a96d5cd 1248 char *name_format;
602adf40 1249
78c2a44a 1250 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1251 if (!name)
1252 return NULL;
1253 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1254 name_format = "%s.%012llx";
1255 if (rbd_dev->image_format == 2)
1256 name_format = "%s.%016llx";
2d0ebc5d 1257 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1258 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1259 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1260 pr_err("error formatting segment name for #%llu (%d)\n",
1261 segment, ret);
7d5079aa 1262 rbd_segment_name_free(name);
65ccfe21
AE
1263 name = NULL;
1264 }
602adf40 1265
65ccfe21
AE
1266 return name;
1267}
602adf40 1268
65ccfe21
AE
1269static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1270{
1271 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1272
65ccfe21
AE
1273 return offset & (segment_size - 1);
1274}
1275
1276static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1277 u64 offset, u64 length)
1278{
1279 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1280
1281 offset &= segment_size - 1;
1282
aafb230e 1283 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1284 if (offset + length > segment_size)
1285 length = segment_size - offset;
1286
1287 return length;
602adf40
YS
1288}
1289
029bcbd8
JD
1290/*
1291 * returns the size of an object in the image
1292 */
1293static u64 rbd_obj_bytes(struct rbd_image_header *header)
1294{
1295 return 1 << header->obj_order;
1296}
1297
602adf40
YS
1298/*
1299 * bio helpers
1300 */
1301
1302static void bio_chain_put(struct bio *chain)
1303{
1304 struct bio *tmp;
1305
1306 while (chain) {
1307 tmp = chain;
1308 chain = chain->bi_next;
1309 bio_put(tmp);
1310 }
1311}
1312
1313/*
1314 * zeros a bio chain, starting at specific offset
1315 */
1316static void zero_bio_chain(struct bio *chain, int start_ofs)
1317{
7988613b
KO
1318 struct bio_vec bv;
1319 struct bvec_iter iter;
602adf40
YS
1320 unsigned long flags;
1321 void *buf;
602adf40
YS
1322 int pos = 0;
1323
1324 while (chain) {
7988613b
KO
1325 bio_for_each_segment(bv, chain, iter) {
1326 if (pos + bv.bv_len > start_ofs) {
602adf40 1327 int remainder = max(start_ofs - pos, 0);
7988613b 1328 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1329 memset(buf + remainder, 0,
7988613b
KO
1330 bv.bv_len - remainder);
1331 flush_dcache_page(bv.bv_page);
85b5aaa6 1332 bvec_kunmap_irq(buf, &flags);
602adf40 1333 }
7988613b 1334 pos += bv.bv_len;
602adf40
YS
1335 }
1336
1337 chain = chain->bi_next;
1338 }
1339}
1340
b9434c5b
AE
1341/*
1342 * similar to zero_bio_chain(), zeros data defined by a page array,
1343 * starting at the given byte offset from the start of the array and
1344 * continuing up to the given end offset. The pages array is
1345 * assumed to be big enough to hold all bytes up to the end.
1346 */
1347static void zero_pages(struct page **pages, u64 offset, u64 end)
1348{
1349 struct page **page = &pages[offset >> PAGE_SHIFT];
1350
1351 rbd_assert(end > offset);
1352 rbd_assert(end - offset <= (u64)SIZE_MAX);
1353 while (offset < end) {
1354 size_t page_offset;
1355 size_t length;
1356 unsigned long flags;
1357 void *kaddr;
1358
491205a8
GU
1359 page_offset = offset & ~PAGE_MASK;
1360 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1361 local_irq_save(flags);
1362 kaddr = kmap_atomic(*page);
1363 memset(kaddr + page_offset, 0, length);
e2156054 1364 flush_dcache_page(*page);
b9434c5b
AE
1365 kunmap_atomic(kaddr);
1366 local_irq_restore(flags);
1367
1368 offset += length;
1369 page++;
1370 }
1371}
1372
602adf40 1373/*
f7760dad
AE
1374 * Clone a portion of a bio, starting at the given byte offset
1375 * and continuing for the number of bytes indicated.
602adf40 1376 */
f7760dad
AE
1377static struct bio *bio_clone_range(struct bio *bio_src,
1378 unsigned int offset,
1379 unsigned int len,
1380 gfp_t gfpmask)
602adf40 1381{
f7760dad
AE
1382 struct bio *bio;
1383
5341a627 1384 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1385 if (!bio)
1386 return NULL; /* ENOMEM */
602adf40 1387
5341a627 1388 bio_advance(bio, offset);
4f024f37 1389 bio->bi_iter.bi_size = len;
f7760dad
AE
1390
1391 return bio;
1392}
1393
1394/*
1395 * Clone a portion of a bio chain, starting at the given byte offset
1396 * into the first bio in the source chain and continuing for the
1397 * number of bytes indicated. The result is another bio chain of
1398 * exactly the given length, or a null pointer on error.
1399 *
1400 * The bio_src and offset parameters are both in-out. On entry they
1401 * refer to the first source bio and the offset into that bio where
1402 * the start of data to be cloned is located.
1403 *
1404 * On return, bio_src is updated to refer to the bio in the source
1405 * chain that contains first un-cloned byte, and *offset will
1406 * contain the offset of that byte within that bio.
1407 */
1408static struct bio *bio_chain_clone_range(struct bio **bio_src,
1409 unsigned int *offset,
1410 unsigned int len,
1411 gfp_t gfpmask)
1412{
1413 struct bio *bi = *bio_src;
1414 unsigned int off = *offset;
1415 struct bio *chain = NULL;
1416 struct bio **end;
1417
1418 /* Build up a chain of clone bios up to the limit */
1419
4f024f37 1420 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1421 return NULL; /* Nothing to clone */
602adf40 1422
f7760dad
AE
1423 end = &chain;
1424 while (len) {
1425 unsigned int bi_size;
1426 struct bio *bio;
1427
f5400b7a
AE
1428 if (!bi) {
1429 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1430 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1431 }
4f024f37 1432 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1433 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1434 if (!bio)
1435 goto out_err; /* ENOMEM */
1436
1437 *end = bio;
1438 end = &bio->bi_next;
602adf40 1439
f7760dad 1440 off += bi_size;
4f024f37 1441 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1442 bi = bi->bi_next;
1443 off = 0;
1444 }
1445 len -= bi_size;
1446 }
1447 *bio_src = bi;
1448 *offset = off;
1449
1450 return chain;
1451out_err:
1452 bio_chain_put(chain);
602adf40 1453
602adf40
YS
1454 return NULL;
1455}
1456
926f9b3f
AE
1457/*
1458 * The default/initial value for all object request flags is 0. For
1459 * each flag, once its value is set to 1 it is never reset to 0
1460 * again.
1461 */
57acbaa7 1462static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1463{
57acbaa7 1464 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1465 struct rbd_device *rbd_dev;
1466
57acbaa7 1467 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1468 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1469 obj_request);
1470 }
1471}
1472
57acbaa7 1473static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1474{
1475 smp_mb();
57acbaa7 1476 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1477}
1478
57acbaa7 1479static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1480{
57acbaa7
AE
1481 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1482 struct rbd_device *rbd_dev = NULL;
6365d33a 1483
57acbaa7
AE
1484 if (obj_request_img_data_test(obj_request))
1485 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1486 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1487 obj_request);
1488 }
1489}
1490
57acbaa7 1491static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1492{
1493 smp_mb();
57acbaa7 1494 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1495}
1496
5679c59f
AE
1497/*
1498 * This sets the KNOWN flag after (possibly) setting the EXISTS
1499 * flag. The latter is set based on the "exists" value provided.
1500 *
1501 * Note that for our purposes once an object exists it never goes
1502 * away again. It's possible that the response from two existence
1503 * checks are separated by the creation of the target object, and
1504 * the first ("doesn't exist") response arrives *after* the second
1505 * ("does exist"). In that case we ignore the second one.
1506 */
1507static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1508 bool exists)
1509{
1510 if (exists)
1511 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1512 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1513 smp_mb();
1514}
1515
1516static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1517{
1518 smp_mb();
1519 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1520}
1521
1522static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1523{
1524 smp_mb();
1525 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1526}
1527
9638556a
ID
1528static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1529{
1530 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1531
1532 return obj_request->img_offset <
1533 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1534}
1535
bf0d5f50
AE
1536static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1537{
37206ee5
AE
1538 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1539 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1540 kref_get(&obj_request->kref);
1541}
1542
1543static void rbd_obj_request_destroy(struct kref *kref);
1544static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1545{
1546 rbd_assert(obj_request != NULL);
37206ee5
AE
1547 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1548 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1549 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1550}
1551
0f2d5be7
AE
1552static void rbd_img_request_get(struct rbd_img_request *img_request)
1553{
1554 dout("%s: img %p (was %d)\n", __func__, img_request,
1555 atomic_read(&img_request->kref.refcount));
1556 kref_get(&img_request->kref);
1557}
1558
e93f3152
AE
1559static bool img_request_child_test(struct rbd_img_request *img_request);
1560static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1561static void rbd_img_request_destroy(struct kref *kref);
1562static void rbd_img_request_put(struct rbd_img_request *img_request)
1563{
1564 rbd_assert(img_request != NULL);
37206ee5
AE
1565 dout("%s: img %p (was %d)\n", __func__, img_request,
1566 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1567 if (img_request_child_test(img_request))
1568 kref_put(&img_request->kref, rbd_parent_request_destroy);
1569 else
1570 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1571}
1572
1573static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1574 struct rbd_obj_request *obj_request)
1575{
25dcf954
AE
1576 rbd_assert(obj_request->img_request == NULL);
1577
b155e86c 1578 /* Image request now owns object's original reference */
bf0d5f50 1579 obj_request->img_request = img_request;
25dcf954 1580 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1581 rbd_assert(!obj_request_img_data_test(obj_request));
1582 obj_request_img_data_set(obj_request);
bf0d5f50 1583 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1584 img_request->obj_request_count++;
1585 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1586 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1587 obj_request->which);
bf0d5f50
AE
1588}
1589
1590static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1591 struct rbd_obj_request *obj_request)
1592{
1593 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1594
37206ee5
AE
1595 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1596 obj_request->which);
bf0d5f50 1597 list_del(&obj_request->links);
25dcf954
AE
1598 rbd_assert(img_request->obj_request_count > 0);
1599 img_request->obj_request_count--;
1600 rbd_assert(obj_request->which == img_request->obj_request_count);
1601 obj_request->which = BAD_WHICH;
6365d33a 1602 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1603 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1604 obj_request->img_request = NULL;
25dcf954 1605 obj_request->callback = NULL;
bf0d5f50
AE
1606 rbd_obj_request_put(obj_request);
1607}
1608
1609static bool obj_request_type_valid(enum obj_request_type type)
1610{
1611 switch (type) {
9969ebc5 1612 case OBJ_REQUEST_NODATA:
bf0d5f50 1613 case OBJ_REQUEST_BIO:
788e2df3 1614 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1615 return true;
1616 default:
1617 return false;
1618 }
1619}
1620
980917fc 1621static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1622{
980917fc
ID
1623 struct ceph_osd_request *osd_req = obj_request->osd_req;
1624
1625 dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
1626 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1627}
1628
71c20a06
ID
1629static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1630{
1631 dout("%s %p\n", __func__, obj_request);
1632 ceph_osdc_cancel_request(obj_request->osd_req);
1633}
1634
1635/*
1636 * Wait for an object request to complete. If interrupted, cancel the
1637 * underlying osd request.
2894e1d7
ID
1638 *
1639 * @timeout: in jiffies, 0 means "wait forever"
71c20a06 1640 */
2894e1d7
ID
1641static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1642 unsigned long timeout)
71c20a06 1643{
2894e1d7 1644 long ret;
71c20a06
ID
1645
1646 dout("%s %p\n", __func__, obj_request);
2894e1d7
ID
1647 ret = wait_for_completion_interruptible_timeout(
1648 &obj_request->completion,
1649 ceph_timeout_jiffies(timeout));
1650 if (ret <= 0) {
1651 if (ret == 0)
1652 ret = -ETIMEDOUT;
71c20a06 1653 rbd_obj_request_end(obj_request);
2894e1d7
ID
1654 } else {
1655 ret = 0;
71c20a06
ID
1656 }
1657
2894e1d7
ID
1658 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1659 return ret;
1660}
1661
1662static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1663{
1664 return __rbd_obj_request_wait(obj_request, 0);
1665}
1666
bf0d5f50
AE
1667static void rbd_img_request_complete(struct rbd_img_request *img_request)
1668{
55f27e09 1669
37206ee5 1670 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1671
1672 /*
1673 * If no error occurred, compute the aggregate transfer
1674 * count for the image request. We could instead use
1675 * atomic64_cmpxchg() to update it as each object request
1676 * completes; not clear which way is better off hand.
1677 */
1678 if (!img_request->result) {
1679 struct rbd_obj_request *obj_request;
1680 u64 xferred = 0;
1681
1682 for_each_obj_request(img_request, obj_request)
1683 xferred += obj_request->xferred;
1684 img_request->xferred = xferred;
1685 }
1686
bf0d5f50
AE
1687 if (img_request->callback)
1688 img_request->callback(img_request);
1689 else
1690 rbd_img_request_put(img_request);
1691}
1692
0c425248
AE
1693/*
1694 * The default/initial value for all image request flags is 0. Each
1695 * is conditionally set to 1 at image request initialization time
1696 * and currently never change thereafter.
1697 */
1698static void img_request_write_set(struct rbd_img_request *img_request)
1699{
1700 set_bit(IMG_REQ_WRITE, &img_request->flags);
1701 smp_mb();
1702}
1703
1704static bool img_request_write_test(struct rbd_img_request *img_request)
1705{
1706 smp_mb();
1707 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1708}
1709
90e98c52
GZ
1710/*
1711 * Set the discard flag when the img_request is an discard request
1712 */
1713static void img_request_discard_set(struct rbd_img_request *img_request)
1714{
1715 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1716 smp_mb();
1717}
1718
1719static bool img_request_discard_test(struct rbd_img_request *img_request)
1720{
1721 smp_mb();
1722 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1723}
1724
9849e986
AE
1725static void img_request_child_set(struct rbd_img_request *img_request)
1726{
1727 set_bit(IMG_REQ_CHILD, &img_request->flags);
1728 smp_mb();
1729}
1730
e93f3152
AE
1731static void img_request_child_clear(struct rbd_img_request *img_request)
1732{
1733 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1734 smp_mb();
1735}
1736
9849e986
AE
1737static bool img_request_child_test(struct rbd_img_request *img_request)
1738{
1739 smp_mb();
1740 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1741}
1742
d0b2e944
AE
1743static void img_request_layered_set(struct rbd_img_request *img_request)
1744{
1745 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1746 smp_mb();
1747}
1748
a2acd00e
AE
1749static void img_request_layered_clear(struct rbd_img_request *img_request)
1750{
1751 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1752 smp_mb();
1753}
1754
d0b2e944
AE
1755static bool img_request_layered_test(struct rbd_img_request *img_request)
1756{
1757 smp_mb();
1758 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1759}
1760
3b434a2a
JD
1761static enum obj_operation_type
1762rbd_img_request_op_type(struct rbd_img_request *img_request)
1763{
1764 if (img_request_write_test(img_request))
1765 return OBJ_OP_WRITE;
1766 else if (img_request_discard_test(img_request))
1767 return OBJ_OP_DISCARD;
1768 else
1769 return OBJ_OP_READ;
1770}
1771
6e2a4505
AE
1772static void
1773rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1774{
b9434c5b
AE
1775 u64 xferred = obj_request->xferred;
1776 u64 length = obj_request->length;
1777
6e2a4505
AE
1778 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1779 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1780 xferred, length);
6e2a4505 1781 /*
17c1cc1d
JD
1782 * ENOENT means a hole in the image. We zero-fill the entire
1783 * length of the request. A short read also implies zero-fill
1784 * to the end of the request. An error requires the whole
1785 * length of the request to be reported finished with an error
1786 * to the block layer. In each case we update the xferred
1787 * count to indicate the whole request was satisfied.
6e2a4505 1788 */
b9434c5b 1789 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1790 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1791 if (obj_request->type == OBJ_REQUEST_BIO)
1792 zero_bio_chain(obj_request->bio_list, 0);
1793 else
1794 zero_pages(obj_request->pages, 0, length);
6e2a4505 1795 obj_request->result = 0;
b9434c5b
AE
1796 } else if (xferred < length && !obj_request->result) {
1797 if (obj_request->type == OBJ_REQUEST_BIO)
1798 zero_bio_chain(obj_request->bio_list, xferred);
1799 else
1800 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1801 }
17c1cc1d 1802 obj_request->xferred = length;
6e2a4505
AE
1803 obj_request_done_set(obj_request);
1804}
1805
bf0d5f50
AE
1806static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1807{
37206ee5
AE
1808 dout("%s: obj %p cb %p\n", __func__, obj_request,
1809 obj_request->callback);
bf0d5f50
AE
1810 if (obj_request->callback)
1811 obj_request->callback(obj_request);
788e2df3
AE
1812 else
1813 complete_all(&obj_request->completion);
bf0d5f50
AE
1814}
1815
c47f9371 1816static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1817{
57acbaa7 1818 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1819 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1820 bool layered = false;
1821
1822 if (obj_request_img_data_test(obj_request)) {
1823 img_request = obj_request->img_request;
1824 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1825 rbd_dev = img_request->rbd_dev;
57acbaa7 1826 }
8b3e1a56
AE
1827
1828 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1829 obj_request, img_request, obj_request->result,
1830 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1831 if (layered && obj_request->result == -ENOENT &&
1832 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1833 rbd_img_parent_read(obj_request);
1834 else if (img_request)
6e2a4505
AE
1835 rbd_img_obj_request_read_callback(obj_request);
1836 else
1837 obj_request_done_set(obj_request);
bf0d5f50
AE
1838}
1839
c47f9371 1840static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1841{
1b83bef2
SW
1842 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1843 obj_request->result, obj_request->length);
1844 /*
8b3e1a56
AE
1845 * There is no such thing as a successful short write. Set
1846 * it to our originally-requested length.
1b83bef2
SW
1847 */
1848 obj_request->xferred = obj_request->length;
07741308 1849 obj_request_done_set(obj_request);
bf0d5f50
AE
1850}
1851
90e98c52
GZ
1852static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1853{
1854 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1855 obj_request->result, obj_request->length);
1856 /*
1857 * There is no such thing as a successful short discard. Set
1858 * it to our originally-requested length.
1859 */
1860 obj_request->xferred = obj_request->length;
d0265de7
JD
1861 /* discarding a non-existent object is not a problem */
1862 if (obj_request->result == -ENOENT)
1863 obj_request->result = 0;
90e98c52
GZ
1864 obj_request_done_set(obj_request);
1865}
1866
fbfab539
AE
1867/*
1868 * For a simple stat call there's nothing to do. We'll do more if
1869 * this is part of a write sequence for a layered image.
1870 */
c47f9371 1871static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1872{
37206ee5 1873 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1874 obj_request_done_set(obj_request);
1875}
1876
2761713d
ID
1877static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1878{
1879 dout("%s: obj %p\n", __func__, obj_request);
1880
1881 if (obj_request_img_data_test(obj_request))
1882 rbd_osd_copyup_callback(obj_request);
1883 else
1884 obj_request_done_set(obj_request);
1885}
1886
85e084fe 1887static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1888{
1889 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1890 u16 opcode;
1891
85e084fe 1892 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1893 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1894 if (obj_request_img_data_test(obj_request)) {
1895 rbd_assert(obj_request->img_request);
1896 rbd_assert(obj_request->which != BAD_WHICH);
1897 } else {
1898 rbd_assert(obj_request->which == BAD_WHICH);
1899 }
bf0d5f50 1900
1b83bef2
SW
1901 if (osd_req->r_result < 0)
1902 obj_request->result = osd_req->r_result;
bf0d5f50 1903
c47f9371
AE
1904 /*
1905 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1906 * passed to the block layer, which just supports a 32-bit
1907 * length field.
c47f9371 1908 */
7665d85b 1909 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1910 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1911
79528734 1912 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1913 switch (opcode) {
1914 case CEPH_OSD_OP_READ:
c47f9371 1915 rbd_osd_read_callback(obj_request);
bf0d5f50 1916 break;
0ccd5926 1917 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1918 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1919 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1920 /* fall through */
bf0d5f50 1921 case CEPH_OSD_OP_WRITE:
e30b7577 1922 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1923 rbd_osd_write_callback(obj_request);
bf0d5f50 1924 break;
fbfab539 1925 case CEPH_OSD_OP_STAT:
c47f9371 1926 rbd_osd_stat_callback(obj_request);
fbfab539 1927 break;
90e98c52
GZ
1928 case CEPH_OSD_OP_DELETE:
1929 case CEPH_OSD_OP_TRUNCATE:
1930 case CEPH_OSD_OP_ZERO:
1931 rbd_osd_discard_callback(obj_request);
1932 break;
36be9a76 1933 case CEPH_OSD_OP_CALL:
2761713d
ID
1934 rbd_osd_call_callback(obj_request);
1935 break;
bf0d5f50 1936 default:
9584d508 1937 rbd_warn(NULL, "%s: unsupported op %hu",
bf0d5f50
AE
1938 obj_request->object_name, (unsigned short) opcode);
1939 break;
1940 }
1941
07741308 1942 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1943 rbd_obj_request_complete(obj_request);
1944}
1945
9d4df01f 1946static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1947{
1948 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1949 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1950
bb873b53
ID
1951 if (img_request)
1952 osd_req->r_snapid = img_request->snap_id;
9d4df01f
AE
1953}
1954
1955static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1956{
9d4df01f 1957 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1958
bb873b53
ID
1959 osd_req->r_mtime = CURRENT_TIME;
1960 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1961}
1962
0ccd5926
ID
1963/*
1964 * Create an osd request. A read request has one osd op (read).
1965 * A write request has either one (watch) or two (hint+write) osd ops.
1966 * (All rbd data writes are prefixed with an allocation hint op, but
1967 * technically osd watch is a write request, hence this distinction.)
1968 */
bf0d5f50
AE
1969static struct ceph_osd_request *rbd_osd_req_create(
1970 struct rbd_device *rbd_dev,
6d2940c8 1971 enum obj_operation_type op_type,
deb236b3 1972 unsigned int num_ops,
430c28c3 1973 struct rbd_obj_request *obj_request)
bf0d5f50 1974{
bf0d5f50
AE
1975 struct ceph_snap_context *snapc = NULL;
1976 struct ceph_osd_client *osdc;
1977 struct ceph_osd_request *osd_req;
bf0d5f50 1978
90e98c52
GZ
1979 if (obj_request_img_data_test(obj_request) &&
1980 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1981 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1982 if (op_type == OBJ_OP_WRITE) {
1983 rbd_assert(img_request_write_test(img_request));
1984 } else {
1985 rbd_assert(img_request_discard_test(img_request));
1986 }
6d2940c8 1987 snapc = img_request->snapc;
bf0d5f50
AE
1988 }
1989
6d2940c8 1990 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3
ID
1991
1992 /* Allocate and initialize the request, for the num_ops ops */
bf0d5f50
AE
1993
1994 osdc = &rbd_dev->rbd_client->client->osdc;
deb236b3 1995 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2224d879 1996 GFP_NOIO);
bf0d5f50 1997 if (!osd_req)
13d1ad16 1998 goto fail;
bf0d5f50 1999
90e98c52 2000 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
bf0d5f50 2001 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 2002 else
bf0d5f50 2003 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
2004
2005 osd_req->r_callback = rbd_osd_req_callback;
2006 osd_req->r_priv = obj_request;
2007
7627151e 2008 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2009 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2010 obj_request->object_name))
2011 goto fail;
bf0d5f50 2012
13d1ad16
ID
2013 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2014 goto fail;
2015
bf0d5f50 2016 return osd_req;
13d1ad16
ID
2017
2018fail:
2019 ceph_osdc_put_request(osd_req);
2020 return NULL;
bf0d5f50
AE
2021}
2022
0eefd470 2023/*
d3246fb0
JD
2024 * Create a copyup osd request based on the information in the object
2025 * request supplied. A copyup request has two or three osd ops, a
2026 * copyup method call, potentially a hint op, and a write or truncate
2027 * or zero op.
0eefd470
AE
2028 */
2029static struct ceph_osd_request *
2030rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2031{
2032 struct rbd_img_request *img_request;
2033 struct ceph_snap_context *snapc;
2034 struct rbd_device *rbd_dev;
2035 struct ceph_osd_client *osdc;
2036 struct ceph_osd_request *osd_req;
d3246fb0 2037 int num_osd_ops = 3;
0eefd470
AE
2038
2039 rbd_assert(obj_request_img_data_test(obj_request));
2040 img_request = obj_request->img_request;
2041 rbd_assert(img_request);
d3246fb0
JD
2042 rbd_assert(img_request_write_test(img_request) ||
2043 img_request_discard_test(img_request));
0eefd470 2044
d3246fb0
JD
2045 if (img_request_discard_test(img_request))
2046 num_osd_ops = 2;
2047
2048 /* Allocate and initialize the request, for all the ops */
0eefd470
AE
2049
2050 snapc = img_request->snapc;
2051 rbd_dev = img_request->rbd_dev;
2052 osdc = &rbd_dev->rbd_client->client->osdc;
d3246fb0 2053 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2224d879 2054 false, GFP_NOIO);
0eefd470 2055 if (!osd_req)
13d1ad16 2056 goto fail;
0eefd470
AE
2057
2058 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2059 osd_req->r_callback = rbd_osd_req_callback;
2060 osd_req->r_priv = obj_request;
2061
7627151e 2062 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2063 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2064 obj_request->object_name))
2065 goto fail;
0eefd470 2066
13d1ad16
ID
2067 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2068 goto fail;
2069
0eefd470 2070 return osd_req;
13d1ad16
ID
2071
2072fail:
2073 ceph_osdc_put_request(osd_req);
2074 return NULL;
0eefd470
AE
2075}
2076
2077
bf0d5f50
AE
2078static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2079{
2080 ceph_osdc_put_request(osd_req);
2081}
2082
2083/* object_name is assumed to be a non-null pointer and NUL-terminated */
2084
2085static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2086 u64 offset, u64 length,
2087 enum obj_request_type type)
2088{
2089 struct rbd_obj_request *obj_request;
2090 size_t size;
2091 char *name;
2092
2093 rbd_assert(obj_request_type_valid(type));
2094
2095 size = strlen(object_name) + 1;
5a60e876 2096 name = kmalloc(size, GFP_NOIO);
f907ad55 2097 if (!name)
bf0d5f50
AE
2098 return NULL;
2099
5a60e876 2100 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
2101 if (!obj_request) {
2102 kfree(name);
2103 return NULL;
2104 }
2105
bf0d5f50
AE
2106 obj_request->object_name = memcpy(name, object_name, size);
2107 obj_request->offset = offset;
2108 obj_request->length = length;
926f9b3f 2109 obj_request->flags = 0;
bf0d5f50
AE
2110 obj_request->which = BAD_WHICH;
2111 obj_request->type = type;
2112 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2113 init_completion(&obj_request->completion);
bf0d5f50
AE
2114 kref_init(&obj_request->kref);
2115
37206ee5
AE
2116 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2117 offset, length, (int)type, obj_request);
2118
bf0d5f50
AE
2119 return obj_request;
2120}
2121
2122static void rbd_obj_request_destroy(struct kref *kref)
2123{
2124 struct rbd_obj_request *obj_request;
2125
2126 obj_request = container_of(kref, struct rbd_obj_request, kref);
2127
37206ee5
AE
2128 dout("%s: obj %p\n", __func__, obj_request);
2129
bf0d5f50
AE
2130 rbd_assert(obj_request->img_request == NULL);
2131 rbd_assert(obj_request->which == BAD_WHICH);
2132
2133 if (obj_request->osd_req)
2134 rbd_osd_req_destroy(obj_request->osd_req);
2135
2136 rbd_assert(obj_request_type_valid(obj_request->type));
2137 switch (obj_request->type) {
9969ebc5
AE
2138 case OBJ_REQUEST_NODATA:
2139 break; /* Nothing to do */
bf0d5f50
AE
2140 case OBJ_REQUEST_BIO:
2141 if (obj_request->bio_list)
2142 bio_chain_put(obj_request->bio_list);
2143 break;
788e2df3
AE
2144 case OBJ_REQUEST_PAGES:
2145 if (obj_request->pages)
2146 ceph_release_page_vector(obj_request->pages,
2147 obj_request->page_count);
2148 break;
bf0d5f50
AE
2149 }
2150
f907ad55 2151 kfree(obj_request->object_name);
868311b1
AE
2152 obj_request->object_name = NULL;
2153 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2154}
2155
fb65d228
AE
2156/* It's OK to call this for a device with no parent */
2157
2158static void rbd_spec_put(struct rbd_spec *spec);
2159static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2160{
2161 rbd_dev_remove_parent(rbd_dev);
2162 rbd_spec_put(rbd_dev->parent_spec);
2163 rbd_dev->parent_spec = NULL;
2164 rbd_dev->parent_overlap = 0;
2165}
2166
a2acd00e
AE
2167/*
2168 * Parent image reference counting is used to determine when an
2169 * image's parent fields can be safely torn down--after there are no
2170 * more in-flight requests to the parent image. When the last
2171 * reference is dropped, cleaning them up is safe.
2172 */
2173static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2174{
2175 int counter;
2176
2177 if (!rbd_dev->parent_spec)
2178 return;
2179
2180 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2181 if (counter > 0)
2182 return;
2183
2184 /* Last reference; clean up parent data structures */
2185
2186 if (!counter)
2187 rbd_dev_unparent(rbd_dev);
2188 else
9584d508 2189 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2190}
2191
2192/*
2193 * If an image has a non-zero parent overlap, get a reference to its
2194 * parent.
2195 *
2196 * Returns true if the rbd device has a parent with a non-zero
2197 * overlap and a reference for it was successfully taken, or
2198 * false otherwise.
2199 */
2200static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2201{
ae43e9d0 2202 int counter = 0;
a2acd00e
AE
2203
2204 if (!rbd_dev->parent_spec)
2205 return false;
2206
ae43e9d0
ID
2207 down_read(&rbd_dev->header_rwsem);
2208 if (rbd_dev->parent_overlap)
2209 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2210 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2211
2212 if (counter < 0)
9584d508 2213 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2214
ae43e9d0 2215 return counter > 0;
a2acd00e
AE
2216}
2217
bf0d5f50
AE
2218/*
2219 * Caller is responsible for filling in the list of object requests
2220 * that comprises the image request, and the Linux request pointer
2221 * (if there is one).
2222 */
cc344fa1
AE
2223static struct rbd_img_request *rbd_img_request_create(
2224 struct rbd_device *rbd_dev,
bf0d5f50 2225 u64 offset, u64 length,
6d2940c8 2226 enum obj_operation_type op_type,
4e752f0a 2227 struct ceph_snap_context *snapc)
bf0d5f50
AE
2228{
2229 struct rbd_img_request *img_request;
bf0d5f50 2230
7a716aac 2231 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2232 if (!img_request)
2233 return NULL;
2234
bf0d5f50
AE
2235 img_request->rq = NULL;
2236 img_request->rbd_dev = rbd_dev;
2237 img_request->offset = offset;
2238 img_request->length = length;
0c425248 2239 img_request->flags = 0;
90e98c52
GZ
2240 if (op_type == OBJ_OP_DISCARD) {
2241 img_request_discard_set(img_request);
2242 img_request->snapc = snapc;
2243 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2244 img_request_write_set(img_request);
4e752f0a 2245 img_request->snapc = snapc;
0c425248 2246 } else {
bf0d5f50 2247 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2248 }
a2acd00e 2249 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2250 img_request_layered_set(img_request);
bf0d5f50
AE
2251 spin_lock_init(&img_request->completion_lock);
2252 img_request->next_completion = 0;
2253 img_request->callback = NULL;
a5a337d4 2254 img_request->result = 0;
bf0d5f50
AE
2255 img_request->obj_request_count = 0;
2256 INIT_LIST_HEAD(&img_request->obj_requests);
2257 kref_init(&img_request->kref);
2258
37206ee5 2259 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2260 obj_op_name(op_type), offset, length, img_request);
37206ee5 2261
bf0d5f50
AE
2262 return img_request;
2263}
2264
2265static void rbd_img_request_destroy(struct kref *kref)
2266{
2267 struct rbd_img_request *img_request;
2268 struct rbd_obj_request *obj_request;
2269 struct rbd_obj_request *next_obj_request;
2270
2271 img_request = container_of(kref, struct rbd_img_request, kref);
2272
37206ee5
AE
2273 dout("%s: img %p\n", __func__, img_request);
2274
bf0d5f50
AE
2275 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2276 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2277 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2278
a2acd00e
AE
2279 if (img_request_layered_test(img_request)) {
2280 img_request_layered_clear(img_request);
2281 rbd_dev_parent_put(img_request->rbd_dev);
2282 }
2283
bef95455
JD
2284 if (img_request_write_test(img_request) ||
2285 img_request_discard_test(img_request))
812164f8 2286 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2287
1c2a9dfe 2288 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2289}
2290
e93f3152
AE
2291static struct rbd_img_request *rbd_parent_request_create(
2292 struct rbd_obj_request *obj_request,
2293 u64 img_offset, u64 length)
2294{
2295 struct rbd_img_request *parent_request;
2296 struct rbd_device *rbd_dev;
2297
2298 rbd_assert(obj_request->img_request);
2299 rbd_dev = obj_request->img_request->rbd_dev;
2300
4e752f0a 2301 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2302 length, OBJ_OP_READ, NULL);
e93f3152
AE
2303 if (!parent_request)
2304 return NULL;
2305
2306 img_request_child_set(parent_request);
2307 rbd_obj_request_get(obj_request);
2308 parent_request->obj_request = obj_request;
2309
2310 return parent_request;
2311}
2312
2313static void rbd_parent_request_destroy(struct kref *kref)
2314{
2315 struct rbd_img_request *parent_request;
2316 struct rbd_obj_request *orig_request;
2317
2318 parent_request = container_of(kref, struct rbd_img_request, kref);
2319 orig_request = parent_request->obj_request;
2320
2321 parent_request->obj_request = NULL;
2322 rbd_obj_request_put(orig_request);
2323 img_request_child_clear(parent_request);
2324
2325 rbd_img_request_destroy(kref);
2326}
2327
1217857f
AE
2328static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2329{
6365d33a 2330 struct rbd_img_request *img_request;
1217857f
AE
2331 unsigned int xferred;
2332 int result;
8b3e1a56 2333 bool more;
1217857f 2334
6365d33a
AE
2335 rbd_assert(obj_request_img_data_test(obj_request));
2336 img_request = obj_request->img_request;
2337
1217857f
AE
2338 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2339 xferred = (unsigned int)obj_request->xferred;
2340 result = obj_request->result;
2341 if (result) {
2342 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2343 enum obj_operation_type op_type;
2344
90e98c52
GZ
2345 if (img_request_discard_test(img_request))
2346 op_type = OBJ_OP_DISCARD;
2347 else if (img_request_write_test(img_request))
2348 op_type = OBJ_OP_WRITE;
2349 else
2350 op_type = OBJ_OP_READ;
1217857f 2351
9584d508 2352 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2353 obj_op_name(op_type), obj_request->length,
2354 obj_request->img_offset, obj_request->offset);
9584d508 2355 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2356 result, xferred);
2357 if (!img_request->result)
2358 img_request->result = result;
082a75da
ID
2359 /*
2360 * Need to end I/O on the entire obj_request worth of
2361 * bytes in case of error.
2362 */
2363 xferred = obj_request->length;
1217857f
AE
2364 }
2365
f1a4739f
AE
2366 /* Image object requests don't own their page array */
2367
2368 if (obj_request->type == OBJ_REQUEST_PAGES) {
2369 obj_request->pages = NULL;
2370 obj_request->page_count = 0;
2371 }
2372
8b3e1a56
AE
2373 if (img_request_child_test(img_request)) {
2374 rbd_assert(img_request->obj_request != NULL);
2375 more = obj_request->which < img_request->obj_request_count - 1;
2376 } else {
2377 rbd_assert(img_request->rq != NULL);
7ad18afa
CH
2378
2379 more = blk_update_request(img_request->rq, result, xferred);
2380 if (!more)
2381 __blk_mq_end_request(img_request->rq, result);
8b3e1a56
AE
2382 }
2383
2384 return more;
1217857f
AE
2385}
2386
2169238d
AE
2387static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2388{
2389 struct rbd_img_request *img_request;
2390 u32 which = obj_request->which;
2391 bool more = true;
2392
6365d33a 2393 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2394 img_request = obj_request->img_request;
2395
2396 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2397 rbd_assert(img_request != NULL);
2169238d
AE
2398 rbd_assert(img_request->obj_request_count > 0);
2399 rbd_assert(which != BAD_WHICH);
2400 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2401
2402 spin_lock_irq(&img_request->completion_lock);
2403 if (which != img_request->next_completion)
2404 goto out;
2405
2406 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2407 rbd_assert(more);
2408 rbd_assert(which < img_request->obj_request_count);
2409
2410 if (!obj_request_done_test(obj_request))
2411 break;
1217857f 2412 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2413 which++;
2414 }
2415
2416 rbd_assert(more ^ (which == img_request->obj_request_count));
2417 img_request->next_completion = which;
2418out:
2419 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2420 rbd_img_request_put(img_request);
2169238d
AE
2421
2422 if (!more)
2423 rbd_img_request_complete(img_request);
2424}
2425
3b434a2a
JD
2426/*
2427 * Add individual osd ops to the given ceph_osd_request and prepare
2428 * them for submission. num_ops is the current number of
2429 * osd operations already to the object request.
2430 */
2431static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2432 struct ceph_osd_request *osd_request,
2433 enum obj_operation_type op_type,
2434 unsigned int num_ops)
2435{
2436 struct rbd_img_request *img_request = obj_request->img_request;
2437 struct rbd_device *rbd_dev = img_request->rbd_dev;
2438 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2439 u64 offset = obj_request->offset;
2440 u64 length = obj_request->length;
2441 u64 img_end;
2442 u16 opcode;
2443
2444 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2445 if (!offset && length == object_size &&
2446 (!img_request_layered_test(img_request) ||
2447 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2448 opcode = CEPH_OSD_OP_DELETE;
2449 } else if ((offset + length == object_size)) {
2450 opcode = CEPH_OSD_OP_TRUNCATE;
2451 } else {
2452 down_read(&rbd_dev->header_rwsem);
2453 img_end = rbd_dev->header.image_size;
2454 up_read(&rbd_dev->header_rwsem);
2455
2456 if (obj_request->img_offset + length == img_end)
2457 opcode = CEPH_OSD_OP_TRUNCATE;
2458 else
2459 opcode = CEPH_OSD_OP_ZERO;
2460 }
2461 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2462 if (!offset && length == object_size)
2463 opcode = CEPH_OSD_OP_WRITEFULL;
2464 else
2465 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2466 osd_req_op_alloc_hint_init(osd_request, num_ops,
2467 object_size, object_size);
2468 num_ops++;
2469 } else {
2470 opcode = CEPH_OSD_OP_READ;
2471 }
2472
7e868b6e 2473 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2474 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2475 else
2476 osd_req_op_extent_init(osd_request, num_ops, opcode,
2477 offset, length, 0, 0);
2478
3b434a2a
JD
2479 if (obj_request->type == OBJ_REQUEST_BIO)
2480 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2481 obj_request->bio_list, length);
2482 else if (obj_request->type == OBJ_REQUEST_PAGES)
2483 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2484 obj_request->pages, length,
2485 offset & ~PAGE_MASK, false, false);
2486
2487 /* Discards are also writes */
2488 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2489 rbd_osd_req_format_write(obj_request);
2490 else
2491 rbd_osd_req_format_read(obj_request);
2492}
2493
f1a4739f
AE
2494/*
2495 * Split up an image request into one or more object requests, each
2496 * to a different object. The "type" parameter indicates whether
2497 * "data_desc" is the pointer to the head of a list of bio
2498 * structures, or the base of a page array. In either case this
2499 * function assumes data_desc describes memory sufficient to hold
2500 * all data described by the image request.
2501 */
2502static int rbd_img_request_fill(struct rbd_img_request *img_request,
2503 enum obj_request_type type,
2504 void *data_desc)
bf0d5f50
AE
2505{
2506 struct rbd_device *rbd_dev = img_request->rbd_dev;
2507 struct rbd_obj_request *obj_request = NULL;
2508 struct rbd_obj_request *next_obj_request;
a158073c 2509 struct bio *bio_list = NULL;
f1a4739f 2510 unsigned int bio_offset = 0;
a158073c 2511 struct page **pages = NULL;
6d2940c8 2512 enum obj_operation_type op_type;
7da22d29 2513 u64 img_offset;
bf0d5f50 2514 u64 resid;
bf0d5f50 2515
f1a4739f
AE
2516 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2517 (int)type, data_desc);
37206ee5 2518
7da22d29 2519 img_offset = img_request->offset;
bf0d5f50 2520 resid = img_request->length;
4dda41d3 2521 rbd_assert(resid > 0);
3b434a2a 2522 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2523
2524 if (type == OBJ_REQUEST_BIO) {
2525 bio_list = data_desc;
4f024f37
KO
2526 rbd_assert(img_offset ==
2527 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2528 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2529 pages = data_desc;
2530 }
2531
bf0d5f50 2532 while (resid) {
2fa12320 2533 struct ceph_osd_request *osd_req;
bf0d5f50 2534 const char *object_name;
bf0d5f50
AE
2535 u64 offset;
2536 u64 length;
2537
7da22d29 2538 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2539 if (!object_name)
2540 goto out_unwind;
7da22d29
AE
2541 offset = rbd_segment_offset(rbd_dev, img_offset);
2542 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2543 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2544 offset, length, type);
78c2a44a
AE
2545 /* object request has its own copy of the object name */
2546 rbd_segment_name_free(object_name);
bf0d5f50
AE
2547 if (!obj_request)
2548 goto out_unwind;
62054da6 2549
03507db6
JD
2550 /*
2551 * set obj_request->img_request before creating the
2552 * osd_request so that it gets the right snapc
2553 */
2554 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2555
f1a4739f
AE
2556 if (type == OBJ_REQUEST_BIO) {
2557 unsigned int clone_size;
2558
2559 rbd_assert(length <= (u64)UINT_MAX);
2560 clone_size = (unsigned int)length;
2561 obj_request->bio_list =
2562 bio_chain_clone_range(&bio_list,
2563 &bio_offset,
2564 clone_size,
2224d879 2565 GFP_NOIO);
f1a4739f 2566 if (!obj_request->bio_list)
62054da6 2567 goto out_unwind;
90e98c52 2568 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2569 unsigned int page_count;
2570
2571 obj_request->pages = pages;
2572 page_count = (u32)calc_pages_for(offset, length);
2573 obj_request->page_count = page_count;
2574 if ((offset + length) & ~PAGE_MASK)
2575 page_count--; /* more on last page */
2576 pages += page_count;
2577 }
bf0d5f50 2578
6d2940c8
GZ
2579 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2580 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2581 obj_request);
2fa12320 2582 if (!osd_req)
62054da6 2583 goto out_unwind;
3b434a2a 2584
2fa12320 2585 obj_request->osd_req = osd_req;
2169238d 2586 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2587 obj_request->img_offset = img_offset;
9d4df01f 2588
3b434a2a 2589 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2590
3b434a2a 2591 rbd_img_request_get(img_request);
bf0d5f50 2592
7da22d29 2593 img_offset += length;
bf0d5f50
AE
2594 resid -= length;
2595 }
2596
2597 return 0;
2598
bf0d5f50
AE
2599out_unwind:
2600 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2601 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2602
2603 return -ENOMEM;
2604}
2605
0eefd470 2606static void
2761713d 2607rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2608{
2609 struct rbd_img_request *img_request;
2610 struct rbd_device *rbd_dev;
ebda6408 2611 struct page **pages;
0eefd470
AE
2612 u32 page_count;
2613
2761713d
ID
2614 dout("%s: obj %p\n", __func__, obj_request);
2615
d3246fb0
JD
2616 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2617 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2618 rbd_assert(obj_request_img_data_test(obj_request));
2619 img_request = obj_request->img_request;
2620 rbd_assert(img_request);
2621
2622 rbd_dev = img_request->rbd_dev;
2623 rbd_assert(rbd_dev);
0eefd470 2624
ebda6408
AE
2625 pages = obj_request->copyup_pages;
2626 rbd_assert(pages != NULL);
0eefd470 2627 obj_request->copyup_pages = NULL;
ebda6408
AE
2628 page_count = obj_request->copyup_page_count;
2629 rbd_assert(page_count);
2630 obj_request->copyup_page_count = 0;
2631 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2632
2633 /*
2634 * We want the transfer count to reflect the size of the
2635 * original write request. There is no such thing as a
2636 * successful short write, so if the request was successful
2637 * we can just set it to the originally-requested length.
2638 */
2639 if (!obj_request->result)
2640 obj_request->xferred = obj_request->length;
2641
2761713d 2642 obj_request_done_set(obj_request);
0eefd470
AE
2643}
2644
3d7efd18
AE
2645static void
2646rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2647{
2648 struct rbd_obj_request *orig_request;
0eefd470 2649 struct ceph_osd_request *osd_req;
0eefd470 2650 struct rbd_device *rbd_dev;
3d7efd18 2651 struct page **pages;
d3246fb0 2652 enum obj_operation_type op_type;
ebda6408 2653 u32 page_count;
bbea1c1a 2654 int img_result;
ebda6408 2655 u64 parent_length;
3d7efd18
AE
2656
2657 rbd_assert(img_request_child_test(img_request));
2658
2659 /* First get what we need from the image request */
2660
2661 pages = img_request->copyup_pages;
2662 rbd_assert(pages != NULL);
2663 img_request->copyup_pages = NULL;
ebda6408
AE
2664 page_count = img_request->copyup_page_count;
2665 rbd_assert(page_count);
2666 img_request->copyup_page_count = 0;
3d7efd18
AE
2667
2668 orig_request = img_request->obj_request;
2669 rbd_assert(orig_request != NULL);
b91f09f1 2670 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2671 img_result = img_request->result;
ebda6408
AE
2672 parent_length = img_request->length;
2673 rbd_assert(parent_length == img_request->xferred);
91c6febb 2674 rbd_img_request_put(img_request);
3d7efd18 2675
91c6febb
AE
2676 rbd_assert(orig_request->img_request);
2677 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2678 rbd_assert(rbd_dev);
0eefd470 2679
bbea1c1a
AE
2680 /*
2681 * If the overlap has become 0 (most likely because the
2682 * image has been flattened) we need to free the pages
2683 * and re-submit the original write request.
2684 */
2685 if (!rbd_dev->parent_overlap) {
bbea1c1a 2686 ceph_release_page_vector(pages, page_count);
980917fc
ID
2687 rbd_obj_request_submit(orig_request);
2688 return;
bbea1c1a 2689 }
0eefd470 2690
bbea1c1a 2691 if (img_result)
0eefd470 2692 goto out_err;
0eefd470 2693
8785b1d4
AE
2694 /*
2695 * The original osd request is of no use to use any more.
0ccd5926 2696 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2697 * request. Allocate the new copyup osd request for the
2698 * original request, and release the old one.
2699 */
bbea1c1a 2700 img_result = -ENOMEM;
0eefd470
AE
2701 osd_req = rbd_osd_req_create_copyup(orig_request);
2702 if (!osd_req)
2703 goto out_err;
8785b1d4 2704 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2705 orig_request->osd_req = osd_req;
2706 orig_request->copyup_pages = pages;
ebda6408 2707 orig_request->copyup_page_count = page_count;
3d7efd18 2708
0eefd470 2709 /* Initialize the copyup op */
3d7efd18 2710
0eefd470 2711 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2712 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2713 false, false);
3d7efd18 2714
d3246fb0 2715 /* Add the other op(s) */
0eefd470 2716
d3246fb0
JD
2717 op_type = rbd_img_request_op_type(orig_request->img_request);
2718 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2719
2720 /* All set, send it off. */
2721
980917fc
ID
2722 rbd_obj_request_submit(orig_request);
2723 return;
2724
0eefd470
AE
2725out_err:
2726 /* Record the error code and complete the request */
2727
bbea1c1a 2728 orig_request->result = img_result;
0eefd470
AE
2729 orig_request->xferred = 0;
2730 obj_request_done_set(orig_request);
2731 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2732}
2733
2734/*
2735 * Read from the parent image the range of data that covers the
2736 * entire target of the given object request. This is used for
2737 * satisfying a layered image write request when the target of an
2738 * object request from the image request does not exist.
2739 *
2740 * A page array big enough to hold the returned data is allocated
2741 * and supplied to rbd_img_request_fill() as the "data descriptor."
2742 * When the read completes, this page array will be transferred to
2743 * the original object request for the copyup operation.
2744 *
2745 * If an error occurs, record it as the result of the original
2746 * object request and mark it done so it gets completed.
2747 */
2748static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2749{
058aa991 2750 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2751 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2752 u64 img_offset;
2753 u64 length;
2754 struct page **pages = NULL;
2755 u32 page_count;
2756 int result;
2757
3d7efd18
AE
2758 rbd_assert(rbd_dev->parent != NULL);
2759
2760 /*
2761 * Determine the byte range covered by the object in the
2762 * child image to which the original request was to be sent.
2763 */
2764 img_offset = obj_request->img_offset - obj_request->offset;
2765 length = (u64)1 << rbd_dev->header.obj_order;
2766
a9e8ba2c
AE
2767 /*
2768 * There is no defined parent data beyond the parent
2769 * overlap, so limit what we read at that boundary if
2770 * necessary.
2771 */
2772 if (img_offset + length > rbd_dev->parent_overlap) {
2773 rbd_assert(img_offset < rbd_dev->parent_overlap);
2774 length = rbd_dev->parent_overlap - img_offset;
2775 }
2776
3d7efd18
AE
2777 /*
2778 * Allocate a page array big enough to receive the data read
2779 * from the parent.
2780 */
2781 page_count = (u32)calc_pages_for(0, length);
2782 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2783 if (IS_ERR(pages)) {
2784 result = PTR_ERR(pages);
2785 pages = NULL;
2786 goto out_err;
2787 }
2788
2789 result = -ENOMEM;
e93f3152
AE
2790 parent_request = rbd_parent_request_create(obj_request,
2791 img_offset, length);
3d7efd18
AE
2792 if (!parent_request)
2793 goto out_err;
3d7efd18
AE
2794
2795 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2796 if (result)
2797 goto out_err;
058aa991 2798
3d7efd18 2799 parent_request->copyup_pages = pages;
ebda6408 2800 parent_request->copyup_page_count = page_count;
3d7efd18 2801 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2802
3d7efd18
AE
2803 result = rbd_img_request_submit(parent_request);
2804 if (!result)
2805 return 0;
2806
2807 parent_request->copyup_pages = NULL;
ebda6408 2808 parent_request->copyup_page_count = 0;
3d7efd18
AE
2809 parent_request->obj_request = NULL;
2810 rbd_obj_request_put(obj_request);
2811out_err:
2812 if (pages)
2813 ceph_release_page_vector(pages, page_count);
2814 if (parent_request)
2815 rbd_img_request_put(parent_request);
2816 obj_request->result = result;
2817 obj_request->xferred = 0;
2818 obj_request_done_set(obj_request);
2819
2820 return result;
2821}
2822
c5b5ef6c
AE
2823static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2824{
c5b5ef6c 2825 struct rbd_obj_request *orig_request;
638f5abe 2826 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2827 int result;
2828
2829 rbd_assert(!obj_request_img_data_test(obj_request));
2830
2831 /*
2832 * All we need from the object request is the original
2833 * request and the result of the STAT op. Grab those, then
2834 * we're done with the request.
2835 */
2836 orig_request = obj_request->obj_request;
2837 obj_request->obj_request = NULL;
912c317d 2838 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2839 rbd_assert(orig_request);
2840 rbd_assert(orig_request->img_request);
2841
2842 result = obj_request->result;
2843 obj_request->result = 0;
2844
2845 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2846 obj_request, orig_request, result,
2847 obj_request->xferred, obj_request->length);
2848 rbd_obj_request_put(obj_request);
2849
638f5abe
AE
2850 /*
2851 * If the overlap has become 0 (most likely because the
980917fc
ID
2852 * image has been flattened) we need to re-submit the
2853 * original request.
638f5abe
AE
2854 */
2855 rbd_dev = orig_request->img_request->rbd_dev;
2856 if (!rbd_dev->parent_overlap) {
980917fc
ID
2857 rbd_obj_request_submit(orig_request);
2858 return;
638f5abe 2859 }
c5b5ef6c
AE
2860
2861 /*
2862 * Our only purpose here is to determine whether the object
2863 * exists, and we don't want to treat the non-existence as
2864 * an error. If something else comes back, transfer the
2865 * error to the original request and complete it now.
2866 */
2867 if (!result) {
2868 obj_request_existence_set(orig_request, true);
2869 } else if (result == -ENOENT) {
2870 obj_request_existence_set(orig_request, false);
2871 } else if (result) {
2872 orig_request->result = result;
3d7efd18 2873 goto out;
c5b5ef6c
AE
2874 }
2875
2876 /*
2877 * Resubmit the original request now that we have recorded
2878 * whether the target object exists.
2879 */
b454e36d 2880 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2881out:
c5b5ef6c
AE
2882 if (orig_request->result)
2883 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2884}
2885
2886static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2887{
058aa991 2888 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2889 struct rbd_obj_request *stat_request;
c5b5ef6c
AE
2890 struct page **pages = NULL;
2891 u32 page_count;
2892 size_t size;
2893 int ret;
2894
2895 /*
2896 * The response data for a STAT call consists of:
2897 * le64 length;
2898 * struct {
2899 * le32 tv_sec;
2900 * le32 tv_nsec;
2901 * } mtime;
2902 */
2903 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2904 page_count = (u32)calc_pages_for(0, size);
2905 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2906 if (IS_ERR(pages))
2907 return PTR_ERR(pages);
2908
2909 ret = -ENOMEM;
2910 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2911 OBJ_REQUEST_PAGES);
2912 if (!stat_request)
2913 goto out;
2914
2915 rbd_obj_request_get(obj_request);
2916 stat_request->obj_request = obj_request;
2917 stat_request->pages = pages;
2918 stat_request->page_count = page_count;
2919
6d2940c8 2920 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 2921 stat_request);
c5b5ef6c
AE
2922 if (!stat_request->osd_req)
2923 goto out;
2924 stat_request->callback = rbd_img_obj_exists_callback;
2925
144cba14 2926 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
c5b5ef6c
AE
2927 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2928 false, false);
9d4df01f 2929 rbd_osd_req_format_read(stat_request);
c5b5ef6c 2930
980917fc
ID
2931 rbd_obj_request_submit(stat_request);
2932 return 0;
2933
c5b5ef6c
AE
2934out:
2935 if (ret)
2936 rbd_obj_request_put(obj_request);
2937
2938 return ret;
2939}
2940
70d045f6 2941static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2942{
058aa991
ID
2943 struct rbd_img_request *img_request = obj_request->img_request;
2944 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2945
70d045f6 2946 /* Reads */
1c220881
JD
2947 if (!img_request_write_test(img_request) &&
2948 !img_request_discard_test(img_request))
70d045f6
ID
2949 return true;
2950
2951 /* Non-layered writes */
2952 if (!img_request_layered_test(img_request))
2953 return true;
2954
b454e36d 2955 /*
70d045f6
ID
2956 * Layered writes outside of the parent overlap range don't
2957 * share any data with the parent.
b454e36d 2958 */
70d045f6
ID
2959 if (!obj_request_overlaps_parent(obj_request))
2960 return true;
b454e36d 2961
c622d226
GZ
2962 /*
2963 * Entire-object layered writes - we will overwrite whatever
2964 * parent data there is anyway.
2965 */
2966 if (!obj_request->offset &&
2967 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2968 return true;
2969
70d045f6
ID
2970 /*
2971 * If the object is known to already exist, its parent data has
2972 * already been copied.
2973 */
2974 if (obj_request_known_test(obj_request) &&
2975 obj_request_exists_test(obj_request))
2976 return true;
2977
2978 return false;
2979}
2980
2981static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2982{
058aa991
ID
2983 rbd_assert(obj_request_img_data_test(obj_request));
2984 rbd_assert(obj_request_type_valid(obj_request->type));
2985 rbd_assert(obj_request->img_request);
2986
70d045f6 2987 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2988 rbd_obj_request_submit(obj_request);
2989 return 0;
b454e36d
AE
2990 }
2991
2992 /*
3d7efd18
AE
2993 * It's a layered write. The target object might exist but
2994 * we may not know that yet. If we know it doesn't exist,
2995 * start by reading the data for the full target object from
2996 * the parent so we can use it for a copyup to the target.
b454e36d 2997 */
70d045f6 2998 if (obj_request_known_test(obj_request))
3d7efd18
AE
2999 return rbd_img_obj_parent_read_full(obj_request);
3000
3001 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
3002
3003 return rbd_img_obj_exists_submit(obj_request);
3004}
3005
bf0d5f50
AE
3006static int rbd_img_request_submit(struct rbd_img_request *img_request)
3007{
bf0d5f50 3008 struct rbd_obj_request *obj_request;
46faeed4 3009 struct rbd_obj_request *next_obj_request;
663ae2cc 3010 int ret = 0;
bf0d5f50 3011
37206ee5 3012 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 3013
663ae2cc
ID
3014 rbd_img_request_get(img_request);
3015 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 3016 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 3017 if (ret)
663ae2cc 3018 goto out_put_ireq;
bf0d5f50
AE
3019 }
3020
663ae2cc
ID
3021out_put_ireq:
3022 rbd_img_request_put(img_request);
3023 return ret;
bf0d5f50 3024}
8b3e1a56
AE
3025
3026static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3027{
3028 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
3029 struct rbd_device *rbd_dev;
3030 u64 obj_end;
02c74fba
AE
3031 u64 img_xferred;
3032 int img_result;
8b3e1a56
AE
3033
3034 rbd_assert(img_request_child_test(img_request));
3035
02c74fba
AE
3036 /* First get what we need from the image request and release it */
3037
8b3e1a56 3038 obj_request = img_request->obj_request;
02c74fba
AE
3039 img_xferred = img_request->xferred;
3040 img_result = img_request->result;
3041 rbd_img_request_put(img_request);
3042
3043 /*
3044 * If the overlap has become 0 (most likely because the
3045 * image has been flattened) we need to re-submit the
3046 * original request.
3047 */
a9e8ba2c
AE
3048 rbd_assert(obj_request);
3049 rbd_assert(obj_request->img_request);
02c74fba
AE
3050 rbd_dev = obj_request->img_request->rbd_dev;
3051 if (!rbd_dev->parent_overlap) {
980917fc
ID
3052 rbd_obj_request_submit(obj_request);
3053 return;
02c74fba 3054 }
a9e8ba2c 3055
02c74fba 3056 obj_request->result = img_result;
a9e8ba2c
AE
3057 if (obj_request->result)
3058 goto out;
3059
3060 /*
3061 * We need to zero anything beyond the parent overlap
3062 * boundary. Since rbd_img_obj_request_read_callback()
3063 * will zero anything beyond the end of a short read, an
3064 * easy way to do this is to pretend the data from the
3065 * parent came up short--ending at the overlap boundary.
3066 */
3067 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3068 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
3069 if (obj_end > rbd_dev->parent_overlap) {
3070 u64 xferred = 0;
3071
3072 if (obj_request->img_offset < rbd_dev->parent_overlap)
3073 xferred = rbd_dev->parent_overlap -
3074 obj_request->img_offset;
8b3e1a56 3075
02c74fba 3076 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 3077 } else {
02c74fba 3078 obj_request->xferred = img_xferred;
a9e8ba2c
AE
3079 }
3080out:
8b3e1a56
AE
3081 rbd_img_obj_request_read_callback(obj_request);
3082 rbd_obj_request_complete(obj_request);
3083}
3084
3085static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3086{
8b3e1a56
AE
3087 struct rbd_img_request *img_request;
3088 int result;
3089
3090 rbd_assert(obj_request_img_data_test(obj_request));
3091 rbd_assert(obj_request->img_request != NULL);
3092 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3093 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3094
8b3e1a56 3095 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3096 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3097 obj_request->img_offset,
e93f3152 3098 obj_request->length);
8b3e1a56
AE
3099 result = -ENOMEM;
3100 if (!img_request)
3101 goto out_err;
3102
5b2ab72d
AE
3103 if (obj_request->type == OBJ_REQUEST_BIO)
3104 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3105 obj_request->bio_list);
3106 else
3107 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3108 obj_request->pages);
8b3e1a56
AE
3109 if (result)
3110 goto out_err;
3111
3112 img_request->callback = rbd_img_parent_read_callback;
3113 result = rbd_img_request_submit(img_request);
3114 if (result)
3115 goto out_err;
3116
3117 return;
3118out_err:
3119 if (img_request)
3120 rbd_img_request_put(img_request);
3121 obj_request->result = result;
3122 obj_request->xferred = 0;
3123 obj_request_done_set(obj_request);
3124}
bf0d5f50 3125
ed95b21a
ID
3126static const struct rbd_client_id rbd_empty_cid;
3127
3128static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3129 const struct rbd_client_id *rhs)
3130{
3131 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3132}
3133
3134static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3135{
3136 struct rbd_client_id cid;
3137
3138 mutex_lock(&rbd_dev->watch_mutex);
3139 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3140 cid.handle = rbd_dev->watch_cookie;
3141 mutex_unlock(&rbd_dev->watch_mutex);
3142 return cid;
3143}
3144
3145/*
3146 * lock_rwsem must be held for write
3147 */
3148static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3149 const struct rbd_client_id *cid)
3150{
3151 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3152 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3153 cid->gid, cid->handle);
3154 rbd_dev->owner_cid = *cid; /* struct */
3155}
3156
3157static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3158{
3159 mutex_lock(&rbd_dev->watch_mutex);
3160 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3161 mutex_unlock(&rbd_dev->watch_mutex);
3162}
3163
3164/*
3165 * lock_rwsem must be held for write
3166 */
3167static int rbd_lock(struct rbd_device *rbd_dev)
3168{
3169 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3170 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3171 char cookie[32];
3172 int ret;
3173
3174 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3175
3176 format_lock_cookie(rbd_dev, cookie);
3177 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3178 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3179 RBD_LOCK_TAG, "", 0);
3180 if (ret)
3181 return ret;
3182
3183 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3184 rbd_set_owner_cid(rbd_dev, &cid);
3185 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3186 return 0;
3187}
3188
3189/*
3190 * lock_rwsem must be held for write
3191 */
3192static int rbd_unlock(struct rbd_device *rbd_dev)
b8d70035 3193{
922dab61 3194 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 3195 char cookie[32];
e627db08 3196 int ret;
b8d70035 3197
ed95b21a
ID
3198 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3199
3200 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3201
3202 format_lock_cookie(rbd_dev, cookie);
3203 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3204 RBD_LOCK_NAME, cookie);
3205 if (ret && ret != -ENOENT) {
3206 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3207 return ret;
3208 }
3209
3210 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3211 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3212 return 0;
3213}
3214
3215static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3216 enum rbd_notify_op notify_op,
3217 struct page ***preply_pages,
3218 size_t *preply_len)
3219{
3220 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3221 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3222 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3223 char buf[buf_size];
3224 void *p = buf;
3225
3226 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3227
3228 /* encode *LockPayload NotifyMessage (op + ClientId) */
3229 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3230 ceph_encode_32(&p, notify_op);
3231 ceph_encode_64(&p, cid.gid);
3232 ceph_encode_64(&p, cid.handle);
3233
3234 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3235 &rbd_dev->header_oloc, buf, buf_size,
3236 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3237}
3238
3239static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3240 enum rbd_notify_op notify_op)
3241{
3242 struct page **reply_pages;
3243 size_t reply_len;
3244
3245 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3246 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3247}
3248
3249static void rbd_notify_acquired_lock(struct work_struct *work)
3250{
3251 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3252 acquired_lock_work);
3253
3254 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3255}
3256
3257static void rbd_notify_released_lock(struct work_struct *work)
3258{
3259 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3260 released_lock_work);
3261
3262 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3263}
3264
3265static int rbd_request_lock(struct rbd_device *rbd_dev)
3266{
3267 struct page **reply_pages;
3268 size_t reply_len;
3269 bool lock_owner_responded = false;
3270 int ret;
52bb1f9b 3271
ed95b21a
ID
3272 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3273
3274 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3275 &reply_pages, &reply_len);
3276 if (ret && ret != -ETIMEDOUT) {
3277 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3278 goto out;
3279 }
3280
3281 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3282 void *p = page_address(reply_pages[0]);
3283 void *const end = p + reply_len;
3284 u32 n;
3285
3286 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3287 while (n--) {
3288 u8 struct_v;
3289 u32 len;
3290
3291 ceph_decode_need(&p, end, 8 + 8, e_inval);
3292 p += 8 + 8; /* skip gid and cookie */
3293
3294 ceph_decode_32_safe(&p, end, len, e_inval);
3295 if (!len)
3296 continue;
3297
3298 if (lock_owner_responded) {
3299 rbd_warn(rbd_dev,
3300 "duplicate lock owners detected");
3301 ret = -EIO;
3302 goto out;
3303 }
3304
3305 lock_owner_responded = true;
3306 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3307 &struct_v, &len);
3308 if (ret) {
3309 rbd_warn(rbd_dev,
3310 "failed to decode ResponseMessage: %d",
3311 ret);
3312 goto e_inval;
3313 }
3314
3315 ret = ceph_decode_32(&p);
3316 }
3317 }
3318
3319 if (!lock_owner_responded) {
3320 rbd_warn(rbd_dev, "no lock owners detected");
3321 ret = -ETIMEDOUT;
3322 }
3323
3324out:
3325 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3326 return ret;
3327
3328e_inval:
3329 ret = -EINVAL;
3330 goto out;
3331}
3332
3333static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3334{
3335 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3336
3337 cancel_delayed_work(&rbd_dev->lock_dwork);
3338 if (wake_all)
3339 wake_up_all(&rbd_dev->lock_waitq);
3340 else
3341 wake_up(&rbd_dev->lock_waitq);
3342}
3343
3344static int get_lock_owner_info(struct rbd_device *rbd_dev,
3345 struct ceph_locker **lockers, u32 *num_lockers)
3346{
3347 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3348 u8 lock_type;
3349 char *lock_tag;
3350 int ret;
3351
3352 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3353
3354 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3355 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3356 &lock_type, &lock_tag, lockers, num_lockers);
3357 if (ret)
3358 return ret;
3359
3360 if (*num_lockers == 0) {
3361 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3362 goto out;
3363 }
3364
3365 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3366 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3367 lock_tag);
3368 ret = -EBUSY;
3369 goto out;
3370 }
3371
3372 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3373 rbd_warn(rbd_dev, "shared lock type detected");
3374 ret = -EBUSY;
3375 goto out;
3376 }
3377
3378 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3379 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3380 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3381 (*lockers)[0].id.cookie);
3382 ret = -EBUSY;
3383 goto out;
3384 }
3385
3386out:
3387 kfree(lock_tag);
3388 return ret;
3389}
3390
3391static int find_watcher(struct rbd_device *rbd_dev,
3392 const struct ceph_locker *locker)
3393{
3394 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3395 struct ceph_watch_item *watchers;
3396 u32 num_watchers;
3397 u64 cookie;
3398 int i;
3399 int ret;
3400
3401 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3402 &rbd_dev->header_oloc, &watchers,
3403 &num_watchers);
3404 if (ret)
3405 return ret;
3406
3407 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3408 for (i = 0; i < num_watchers; i++) {
3409 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3410 sizeof(locker->info.addr)) &&
3411 watchers[i].cookie == cookie) {
3412 struct rbd_client_id cid = {
3413 .gid = le64_to_cpu(watchers[i].name.num),
3414 .handle = cookie,
3415 };
3416
3417 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3418 rbd_dev, cid.gid, cid.handle);
3419 rbd_set_owner_cid(rbd_dev, &cid);
3420 ret = 1;
3421 goto out;
3422 }
3423 }
3424
3425 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3426 ret = 0;
3427out:
3428 kfree(watchers);
3429 return ret;
3430}
3431
3432/*
3433 * lock_rwsem must be held for write
3434 */
3435static int rbd_try_lock(struct rbd_device *rbd_dev)
3436{
3437 struct ceph_client *client = rbd_dev->rbd_client->client;
3438 struct ceph_locker *lockers;
3439 u32 num_lockers;
3440 int ret;
3441
3442 for (;;) {
3443 ret = rbd_lock(rbd_dev);
3444 if (ret != -EBUSY)
3445 return ret;
3446
3447 /* determine if the current lock holder is still alive */
3448 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3449 if (ret)
3450 return ret;
3451
3452 if (num_lockers == 0)
3453 goto again;
3454
3455 ret = find_watcher(rbd_dev, lockers);
3456 if (ret) {
3457 if (ret > 0)
3458 ret = 0; /* have to request lock */
3459 goto out;
3460 }
3461
3462 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3463 ENTITY_NAME(lockers[0].id.name));
3464
3465 ret = ceph_monc_blacklist_add(&client->monc,
3466 &lockers[0].info.addr);
3467 if (ret) {
3468 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3469 ENTITY_NAME(lockers[0].id.name), ret);
3470 goto out;
3471 }
3472
3473 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3474 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3475 lockers[0].id.cookie,
3476 &lockers[0].id.name);
3477 if (ret && ret != -ENOENT)
3478 goto out;
3479
3480again:
3481 ceph_free_lockers(lockers, num_lockers);
3482 }
3483
3484out:
3485 ceph_free_lockers(lockers, num_lockers);
3486 return ret;
3487}
3488
3489/*
3490 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3491 */
3492static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3493 int *pret)
3494{
3495 enum rbd_lock_state lock_state;
3496
3497 down_read(&rbd_dev->lock_rwsem);
3498 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3499 rbd_dev->lock_state);
3500 if (__rbd_is_lock_owner(rbd_dev)) {
3501 lock_state = rbd_dev->lock_state;
3502 up_read(&rbd_dev->lock_rwsem);
3503 return lock_state;
3504 }
3505
3506 up_read(&rbd_dev->lock_rwsem);
3507 down_write(&rbd_dev->lock_rwsem);
3508 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3509 rbd_dev->lock_state);
3510 if (!__rbd_is_lock_owner(rbd_dev)) {
3511 *pret = rbd_try_lock(rbd_dev);
3512 if (*pret)
3513 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3514 }
3515
3516 lock_state = rbd_dev->lock_state;
3517 up_write(&rbd_dev->lock_rwsem);
3518 return lock_state;
3519}
3520
3521static void rbd_acquire_lock(struct work_struct *work)
3522{
3523 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3524 struct rbd_device, lock_dwork);
3525 enum rbd_lock_state lock_state;
3526 int ret;
3527
3528 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3529again:
3530 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3531 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3532 if (lock_state == RBD_LOCK_STATE_LOCKED)
3533 wake_requests(rbd_dev, true);
3534 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3535 rbd_dev, lock_state, ret);
3536 return;
3537 }
3538
3539 ret = rbd_request_lock(rbd_dev);
3540 if (ret == -ETIMEDOUT) {
3541 goto again; /* treat this as a dead client */
3542 } else if (ret < 0) {
3543 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3544 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3545 RBD_RETRY_DELAY);
3546 } else {
3547 /*
3548 * lock owner acked, but resend if we don't see them
3549 * release the lock
3550 */
3551 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3552 rbd_dev);
3553 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3554 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3555 }
3556}
3557
3558/*
3559 * lock_rwsem must be held for write
3560 */
3561static bool rbd_release_lock(struct rbd_device *rbd_dev)
3562{
3563 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3564 rbd_dev->lock_state);
3565 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3566 return false;
3567
3568 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3569 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3570 /*
ed95b21a 3571 * Ensure that all in-flight IO is flushed.
52bb1f9b 3572 *
ed95b21a
ID
3573 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3574 * may be shared with other devices.
52bb1f9b 3575 */
ed95b21a
ID
3576 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3577 up_read(&rbd_dev->lock_rwsem);
3578
3579 down_write(&rbd_dev->lock_rwsem);
3580 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3581 rbd_dev->lock_state);
3582 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3583 return false;
3584
3585 if (!rbd_unlock(rbd_dev))
3586 /*
3587 * Give others a chance to grab the lock - we would re-acquire
3588 * almost immediately if we got new IO during ceph_osdc_sync()
3589 * otherwise. We need to ack our own notifications, so this
3590 * lock_dwork will be requeued from rbd_wait_state_locked()
3591 * after wake_requests() in rbd_handle_released_lock().
3592 */
3593 cancel_delayed_work(&rbd_dev->lock_dwork);
3594
3595 return true;
3596}
3597
3598static void rbd_release_lock_work(struct work_struct *work)
3599{
3600 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3601 unlock_work);
3602
3603 down_write(&rbd_dev->lock_rwsem);
3604 rbd_release_lock(rbd_dev);
3605 up_write(&rbd_dev->lock_rwsem);
3606}
3607
3608static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3609 void **p)
3610{
3611 struct rbd_client_id cid = { 0 };
3612
3613 if (struct_v >= 2) {
3614 cid.gid = ceph_decode_64(p);
3615 cid.handle = ceph_decode_64(p);
3616 }
3617
3618 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3619 cid.handle);
3620 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3621 down_write(&rbd_dev->lock_rwsem);
3622 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3623 /*
3624 * we already know that the remote client is
3625 * the owner
3626 */
3627 up_write(&rbd_dev->lock_rwsem);
3628 return;
3629 }
3630
3631 rbd_set_owner_cid(rbd_dev, &cid);
3632 downgrade_write(&rbd_dev->lock_rwsem);
3633 } else {
3634 down_read(&rbd_dev->lock_rwsem);
3635 }
3636
3637 if (!__rbd_is_lock_owner(rbd_dev))
3638 wake_requests(rbd_dev, false);
3639 up_read(&rbd_dev->lock_rwsem);
3640}
3641
3642static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3643 void **p)
3644{
3645 struct rbd_client_id cid = { 0 };
3646
3647 if (struct_v >= 2) {
3648 cid.gid = ceph_decode_64(p);
3649 cid.handle = ceph_decode_64(p);
3650 }
3651
3652 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3653 cid.handle);
3654 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3655 down_write(&rbd_dev->lock_rwsem);
3656 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3657 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3658 __func__, rbd_dev, cid.gid, cid.handle,
3659 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3660 up_write(&rbd_dev->lock_rwsem);
3661 return;
3662 }
3663
3664 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3665 downgrade_write(&rbd_dev->lock_rwsem);
3666 } else {
3667 down_read(&rbd_dev->lock_rwsem);
3668 }
3669
3670 if (!__rbd_is_lock_owner(rbd_dev))
3671 wake_requests(rbd_dev, false);
3672 up_read(&rbd_dev->lock_rwsem);
3673}
3674
3675static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3676 void **p)
3677{
3678 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3679 struct rbd_client_id cid = { 0 };
3680 bool need_to_send;
3681
3682 if (struct_v >= 2) {
3683 cid.gid = ceph_decode_64(p);
3684 cid.handle = ceph_decode_64(p);
3685 }
3686
3687 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3688 cid.handle);
3689 if (rbd_cid_equal(&cid, &my_cid))
3690 return false;
3691
3692 down_read(&rbd_dev->lock_rwsem);
3693 need_to_send = __rbd_is_lock_owner(rbd_dev);
3694 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3695 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3696 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3697 rbd_dev);
3698 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3699 }
3700 }
3701 up_read(&rbd_dev->lock_rwsem);
3702 return need_to_send;
3703}
3704
3705static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3706 u64 notify_id, u64 cookie, s32 *result)
3707{
3708 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3709 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3710 char buf[buf_size];
3711 int ret;
3712
3713 if (result) {
3714 void *p = buf;
3715
3716 /* encode ResponseMessage */
3717 ceph_start_encoding(&p, 1, 1,
3718 buf_size - CEPH_ENCODING_START_BLK_LEN);
3719 ceph_encode_32(&p, *result);
3720 } else {
3721 buf_size = 0;
3722 }
b8d70035 3723
922dab61
ID
3724 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3725 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3726 buf, buf_size);
52bb1f9b 3727 if (ret)
ed95b21a
ID
3728 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3729}
3730
3731static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3732 u64 cookie)
3733{
3734 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3735 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3736}
3737
3738static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3739 u64 notify_id, u64 cookie, s32 result)
3740{
3741 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3742 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3743}
3744
3745static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3746 u64 notifier_id, void *data, size_t data_len)
3747{
3748 struct rbd_device *rbd_dev = arg;
3749 void *p = data;
3750 void *const end = p + data_len;
3751 u8 struct_v;
3752 u32 len;
3753 u32 notify_op;
3754 int ret;
3755
3756 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3757 __func__, rbd_dev, cookie, notify_id, data_len);
3758 if (data_len) {
3759 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3760 &struct_v, &len);
3761 if (ret) {
3762 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3763 ret);
3764 return;
3765 }
3766
3767 notify_op = ceph_decode_32(&p);
3768 } else {
3769 /* legacy notification for header updates */
3770 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3771 len = 0;
3772 }
3773
3774 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3775 switch (notify_op) {
3776 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3777 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3778 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3779 break;
3780 case RBD_NOTIFY_OP_RELEASED_LOCK:
3781 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3782 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3783 break;
3784 case RBD_NOTIFY_OP_REQUEST_LOCK:
3785 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3786 /*
3787 * send ResponseMessage(0) back so the client
3788 * can detect a missing owner
3789 */
3790 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3791 cookie, 0);
3792 else
3793 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3794 break;
3795 case RBD_NOTIFY_OP_HEADER_UPDATE:
3796 ret = rbd_dev_refresh(rbd_dev);
3797 if (ret)
3798 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3799
3800 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3801 break;
3802 default:
3803 if (rbd_is_lock_owner(rbd_dev))
3804 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3805 cookie, -EOPNOTSUPP);
3806 else
3807 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3808 break;
3809 }
b8d70035
AE
3810}
3811
99d16943
ID
3812static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3813
922dab61 3814static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3815{
922dab61 3816 struct rbd_device *rbd_dev = arg;
bb040aa0 3817
922dab61 3818 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3819
ed95b21a
ID
3820 down_write(&rbd_dev->lock_rwsem);
3821 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3822 up_write(&rbd_dev->lock_rwsem);
3823
99d16943
ID
3824 mutex_lock(&rbd_dev->watch_mutex);
3825 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3826 __rbd_unregister_watch(rbd_dev);
3827 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3828
99d16943 3829 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3830 }
99d16943 3831 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3832}
3833
9969ebc5 3834/*
99d16943 3835 * watch_mutex must be locked
9969ebc5 3836 */
99d16943 3837static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3838{
3839 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3840 struct ceph_osd_linger_request *handle;
9969ebc5 3841
922dab61 3842 rbd_assert(!rbd_dev->watch_handle);
99d16943 3843 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3844
922dab61
ID
3845 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3846 &rbd_dev->header_oloc, rbd_watch_cb,
3847 rbd_watch_errcb, rbd_dev);
3848 if (IS_ERR(handle))
3849 return PTR_ERR(handle);
8eb87565 3850
922dab61 3851 rbd_dev->watch_handle = handle;
b30a01f2 3852 return 0;
b30a01f2
ID
3853}
3854
99d16943
ID
3855/*
3856 * watch_mutex must be locked
3857 */
3858static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3859{
922dab61
ID
3860 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3861 int ret;
b30a01f2 3862
99d16943
ID
3863 rbd_assert(rbd_dev->watch_handle);
3864 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3865
922dab61
ID
3866 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3867 if (ret)
3868 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3869
922dab61 3870 rbd_dev->watch_handle = NULL;
c525f036
ID
3871}
3872
99d16943
ID
3873static int rbd_register_watch(struct rbd_device *rbd_dev)
3874{
3875 int ret;
3876
3877 mutex_lock(&rbd_dev->watch_mutex);
3878 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3879 ret = __rbd_register_watch(rbd_dev);
3880 if (ret)
3881 goto out;
3882
3883 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3884 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3885
3886out:
3887 mutex_unlock(&rbd_dev->watch_mutex);
3888 return ret;
3889}
3890
3891static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3892{
99d16943
ID
3893 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3894
3895 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3896 cancel_work_sync(&rbd_dev->acquired_lock_work);
3897 cancel_work_sync(&rbd_dev->released_lock_work);
3898 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3899 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3900}
3901
3902static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3903{
ed95b21a 3904 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3905 cancel_tasks_sync(rbd_dev);
3906
3907 mutex_lock(&rbd_dev->watch_mutex);
3908 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3909 __rbd_unregister_watch(rbd_dev);
3910 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3911 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3912
811c6688 3913 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3914}
3915
99d16943
ID
3916static void rbd_reregister_watch(struct work_struct *work)
3917{
3918 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3919 struct rbd_device, watch_dwork);
ed95b21a 3920 bool was_lock_owner = false;
99d16943
ID
3921 int ret;
3922
3923 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3924
ed95b21a
ID
3925 down_write(&rbd_dev->lock_rwsem);
3926 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3927 was_lock_owner = rbd_release_lock(rbd_dev);
3928
99d16943
ID
3929 mutex_lock(&rbd_dev->watch_mutex);
3930 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
3931 goto fail_unlock;
3932
3933 ret = __rbd_register_watch(rbd_dev);
3934 if (ret) {
3935 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3936 if (ret != -EBLACKLISTED)
3937 queue_delayed_work(rbd_dev->task_wq,
3938 &rbd_dev->watch_dwork,
3939 RBD_RETRY_DELAY);
3940 goto fail_unlock;
3941 }
3942
3943 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3944 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3945 mutex_unlock(&rbd_dev->watch_mutex);
3946
3947 ret = rbd_dev_refresh(rbd_dev);
3948 if (ret)
3949 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3950
ed95b21a
ID
3951 if (was_lock_owner) {
3952 ret = rbd_try_lock(rbd_dev);
3953 if (ret)
3954 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3955 ret);
3956 }
3957
3958 up_write(&rbd_dev->lock_rwsem);
3959 wake_requests(rbd_dev, true);
99d16943
ID
3960 return;
3961
3962fail_unlock:
3963 mutex_unlock(&rbd_dev->watch_mutex);
ed95b21a 3964 up_write(&rbd_dev->lock_rwsem);
99d16943
ID
3965}
3966
36be9a76 3967/*
f40eb349
AE
3968 * Synchronous osd object method call. Returns the number of bytes
3969 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3970 */
3971static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3972 const char *object_name,
3973 const char *class_name,
3974 const char *method_name,
4157976b 3975 const void *outbound,
36be9a76 3976 size_t outbound_size,
4157976b 3977 void *inbound,
e2a58ee5 3978 size_t inbound_size)
36be9a76
AE
3979{
3980 struct rbd_obj_request *obj_request;
36be9a76
AE
3981 struct page **pages;
3982 u32 page_count;
3983 int ret;
3984
3985 /*
6010a451
AE
3986 * Method calls are ultimately read operations. The result
3987 * should placed into the inbound buffer provided. They
3988 * also supply outbound data--parameters for the object
3989 * method. Currently if this is present it will be a
3990 * snapshot id.
36be9a76 3991 */
57385b51 3992 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
3993 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3994 if (IS_ERR(pages))
3995 return PTR_ERR(pages);
3996
3997 ret = -ENOMEM;
6010a451 3998 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
3999 OBJ_REQUEST_PAGES);
4000 if (!obj_request)
4001 goto out;
4002
4003 obj_request->pages = pages;
4004 obj_request->page_count = page_count;
4005
6d2940c8 4006 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4007 obj_request);
36be9a76
AE
4008 if (!obj_request->osd_req)
4009 goto out;
4010
c99d2d4a 4011 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
4012 class_name, method_name);
4013 if (outbound_size) {
4014 struct ceph_pagelist *pagelist;
4015
4016 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4017 if (!pagelist)
4018 goto out;
4019
4020 ceph_pagelist_init(pagelist);
4021 ceph_pagelist_append(pagelist, outbound, outbound_size);
4022 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4023 pagelist);
4024 }
a4ce40a9
AE
4025 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4026 obj_request->pages, inbound_size,
44cd188d 4027 0, false, false);
9d4df01f 4028 rbd_osd_req_format_read(obj_request);
430c28c3 4029
980917fc 4030 rbd_obj_request_submit(obj_request);
36be9a76
AE
4031 ret = rbd_obj_request_wait(obj_request);
4032 if (ret)
4033 goto out;
4034
4035 ret = obj_request->result;
4036 if (ret < 0)
4037 goto out;
57385b51
AE
4038
4039 rbd_assert(obj_request->xferred < (u64)INT_MAX);
4040 ret = (int)obj_request->xferred;
903bb32e 4041 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
4042out:
4043 if (obj_request)
4044 rbd_obj_request_put(obj_request);
4045 else
4046 ceph_release_page_vector(pages, page_count);
4047
4048 return ret;
4049}
4050
ed95b21a
ID
4051/*
4052 * lock_rwsem must be held for read
4053 */
4054static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4055{
4056 DEFINE_WAIT(wait);
4057
4058 do {
4059 /*
4060 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4061 * and cancel_delayed_work() in wake_requests().
4062 */
4063 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4064 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4065 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4066 TASK_UNINTERRUPTIBLE);
4067 up_read(&rbd_dev->lock_rwsem);
4068 schedule();
4069 down_read(&rbd_dev->lock_rwsem);
4070 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4071 finish_wait(&rbd_dev->lock_waitq, &wait);
4072}
4073
7ad18afa 4074static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4075{
7ad18afa
CH
4076 struct request *rq = blk_mq_rq_from_pdu(work);
4077 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4078 struct rbd_img_request *img_request;
4e752f0a 4079 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4080 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4081 u64 length = blk_rq_bytes(rq);
6d2940c8 4082 enum obj_operation_type op_type;
4e752f0a 4083 u64 mapping_size;
80de1912 4084 bool must_be_locked;
bf0d5f50
AE
4085 int result;
4086
7ad18afa
CH
4087 if (rq->cmd_type != REQ_TYPE_FS) {
4088 dout("%s: non-fs request type %d\n", __func__,
4089 (int) rq->cmd_type);
4090 result = -EIO;
4091 goto err;
4092 }
4093
c2df40df 4094 if (req_op(rq) == REQ_OP_DISCARD)
90e98c52 4095 op_type = OBJ_OP_DISCARD;
c2df40df 4096 else if (req_op(rq) == REQ_OP_WRITE)
6d2940c8
GZ
4097 op_type = OBJ_OP_WRITE;
4098 else
4099 op_type = OBJ_OP_READ;
4100
bc1ecc65 4101 /* Ignore/skip any zero-length requests */
bf0d5f50 4102
bc1ecc65
ID
4103 if (!length) {
4104 dout("%s: zero-length request\n", __func__);
4105 result = 0;
4106 goto err_rq;
4107 }
bf0d5f50 4108
6d2940c8 4109 /* Only reads are allowed to a read-only device */
bc1ecc65 4110
6d2940c8 4111 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4112 if (rbd_dev->mapping.read_only) {
4113 result = -EROFS;
4114 goto err_rq;
4dda41d3 4115 }
bc1ecc65
ID
4116 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4117 }
4dda41d3 4118
bc1ecc65
ID
4119 /*
4120 * Quit early if the mapped snapshot no longer exists. It's
4121 * still possible the snapshot will have disappeared by the
4122 * time our request arrives at the osd, but there's no sense in
4123 * sending it if we already know.
4124 */
4125 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4126 dout("request for non-existent snapshot");
4127 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4128 result = -ENXIO;
4129 goto err_rq;
4130 }
4dda41d3 4131
bc1ecc65
ID
4132 if (offset && length > U64_MAX - offset + 1) {
4133 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4134 length);
4135 result = -EINVAL;
4136 goto err_rq; /* Shouldn't happen */
4137 }
4dda41d3 4138
7ad18afa
CH
4139 blk_mq_start_request(rq);
4140
4e752f0a
JD
4141 down_read(&rbd_dev->header_rwsem);
4142 mapping_size = rbd_dev->mapping.size;
6d2940c8 4143 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4144 snapc = rbd_dev->header.snapc;
4145 ceph_get_snap_context(snapc);
ed95b21a 4146 must_be_locked = rbd_is_lock_supported(rbd_dev);
80de1912
ID
4147 } else {
4148 must_be_locked = rbd_dev->opts->lock_on_read &&
4149 rbd_is_lock_supported(rbd_dev);
4e752f0a
JD
4150 }
4151 up_read(&rbd_dev->header_rwsem);
4152
4153 if (offset + length > mapping_size) {
bc1ecc65 4154 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4155 length, mapping_size);
bc1ecc65
ID
4156 result = -EIO;
4157 goto err_rq;
4158 }
bf0d5f50 4159
ed95b21a
ID
4160 if (must_be_locked) {
4161 down_read(&rbd_dev->lock_rwsem);
4162 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4163 rbd_wait_state_locked(rbd_dev);
4164 }
4165
6d2940c8 4166 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4167 snapc);
bc1ecc65
ID
4168 if (!img_request) {
4169 result = -ENOMEM;
ed95b21a 4170 goto err_unlock;
bc1ecc65
ID
4171 }
4172 img_request->rq = rq;
70b16db8 4173 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4174
90e98c52
GZ
4175 if (op_type == OBJ_OP_DISCARD)
4176 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4177 NULL);
4178 else
4179 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4180 rq->bio);
bc1ecc65
ID
4181 if (result)
4182 goto err_img_request;
bf0d5f50 4183
bc1ecc65
ID
4184 result = rbd_img_request_submit(img_request);
4185 if (result)
4186 goto err_img_request;
bf0d5f50 4187
ed95b21a
ID
4188 if (must_be_locked)
4189 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4190 return;
bf0d5f50 4191
bc1ecc65
ID
4192err_img_request:
4193 rbd_img_request_put(img_request);
ed95b21a
ID
4194err_unlock:
4195 if (must_be_locked)
4196 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4197err_rq:
4198 if (result)
4199 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4200 obj_op_name(op_type), length, offset, result);
e96a650a 4201 ceph_put_snap_context(snapc);
7ad18afa
CH
4202err:
4203 blk_mq_end_request(rq, result);
bc1ecc65 4204}
bf0d5f50 4205
7ad18afa
CH
4206static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4207 const struct blk_mq_queue_data *bd)
bc1ecc65 4208{
7ad18afa
CH
4209 struct request *rq = bd->rq;
4210 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4211
7ad18afa
CH
4212 queue_work(rbd_wq, work);
4213 return BLK_MQ_RQ_QUEUE_OK;
bf0d5f50
AE
4214}
4215
602adf40
YS
4216static void rbd_free_disk(struct rbd_device *rbd_dev)
4217{
4218 struct gendisk *disk = rbd_dev->disk;
4219
4220 if (!disk)
4221 return;
4222
a0cab924
AE
4223 rbd_dev->disk = NULL;
4224 if (disk->flags & GENHD_FL_UP) {
602adf40 4225 del_gendisk(disk);
a0cab924
AE
4226 if (disk->queue)
4227 blk_cleanup_queue(disk->queue);
7ad18afa 4228 blk_mq_free_tag_set(&rbd_dev->tag_set);
a0cab924 4229 }
602adf40
YS
4230 put_disk(disk);
4231}
4232
788e2df3
AE
4233static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4234 const char *object_name,
7097f8df 4235 u64 offset, u64 length, void *buf)
788e2df3
AE
4236
4237{
788e2df3 4238 struct rbd_obj_request *obj_request;
788e2df3
AE
4239 struct page **pages = NULL;
4240 u32 page_count;
1ceae7ef 4241 size_t size;
788e2df3
AE
4242 int ret;
4243
4244 page_count = (u32) calc_pages_for(offset, length);
4245 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4246 if (IS_ERR(pages))
a8d42056 4247 return PTR_ERR(pages);
788e2df3
AE
4248
4249 ret = -ENOMEM;
4250 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 4251 OBJ_REQUEST_PAGES);
788e2df3
AE
4252 if (!obj_request)
4253 goto out;
4254
4255 obj_request->pages = pages;
4256 obj_request->page_count = page_count;
4257
6d2940c8 4258 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4259 obj_request);
788e2df3
AE
4260 if (!obj_request->osd_req)
4261 goto out;
4262
c99d2d4a
AE
4263 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4264 offset, length, 0, 0);
406e2c9f 4265 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 4266 obj_request->pages,
44cd188d
AE
4267 obj_request->length,
4268 obj_request->offset & ~PAGE_MASK,
4269 false, false);
9d4df01f 4270 rbd_osd_req_format_read(obj_request);
430c28c3 4271
980917fc 4272 rbd_obj_request_submit(obj_request);
788e2df3
AE
4273 ret = rbd_obj_request_wait(obj_request);
4274 if (ret)
4275 goto out;
4276
4277 ret = obj_request->result;
4278 if (ret < 0)
4279 goto out;
1ceae7ef
AE
4280
4281 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4282 size = (size_t) obj_request->xferred;
903bb32e 4283 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
4284 rbd_assert(size <= (size_t)INT_MAX);
4285 ret = (int)size;
788e2df3
AE
4286out:
4287 if (obj_request)
4288 rbd_obj_request_put(obj_request);
4289 else
4290 ceph_release_page_vector(pages, page_count);
4291
4292 return ret;
4293}
4294
602adf40 4295/*
662518b1
AE
4296 * Read the complete header for the given rbd device. On successful
4297 * return, the rbd_dev->header field will contain up-to-date
4298 * information about the image.
602adf40 4299 */
99a41ebc 4300static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4301{
4156d998 4302 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4303 u32 snap_count = 0;
4156d998
AE
4304 u64 names_size = 0;
4305 u32 want_count;
4306 int ret;
602adf40 4307
00f1f36f 4308 /*
4156d998
AE
4309 * The complete header will include an array of its 64-bit
4310 * snapshot ids, followed by the names of those snapshots as
4311 * a contiguous block of NUL-terminated strings. Note that
4312 * the number of snapshots could change by the time we read
4313 * it in, in which case we re-read it.
00f1f36f 4314 */
4156d998
AE
4315 do {
4316 size_t size;
4317
4318 kfree(ondisk);
4319
4320 size = sizeof (*ondisk);
4321 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4322 size += names_size;
4323 ondisk = kmalloc(size, GFP_KERNEL);
4324 if (!ondisk)
662518b1 4325 return -ENOMEM;
4156d998 4326
c41d13a3 4327 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
7097f8df 4328 0, size, ondisk);
4156d998 4329 if (ret < 0)
662518b1 4330 goto out;
c0cd10db 4331 if ((size_t)ret < size) {
4156d998 4332 ret = -ENXIO;
06ecc6cb
AE
4333 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4334 size, ret);
662518b1 4335 goto out;
4156d998
AE
4336 }
4337 if (!rbd_dev_ondisk_valid(ondisk)) {
4338 ret = -ENXIO;
06ecc6cb 4339 rbd_warn(rbd_dev, "invalid header");
662518b1 4340 goto out;
81e759fb 4341 }
602adf40 4342
4156d998
AE
4343 names_size = le64_to_cpu(ondisk->snap_names_len);
4344 want_count = snap_count;
4345 snap_count = le32_to_cpu(ondisk->snap_count);
4346 } while (snap_count != want_count);
00f1f36f 4347
662518b1
AE
4348 ret = rbd_header_from_disk(rbd_dev, ondisk);
4349out:
4156d998
AE
4350 kfree(ondisk);
4351
4352 return ret;
602adf40
YS
4353}
4354
15228ede
AE
4355/*
4356 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4357 * has disappeared from the (just updated) snapshot context.
4358 */
4359static void rbd_exists_validate(struct rbd_device *rbd_dev)
4360{
4361 u64 snap_id;
4362
4363 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4364 return;
4365
4366 snap_id = rbd_dev->spec->snap_id;
4367 if (snap_id == CEPH_NOSNAP)
4368 return;
4369
4370 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4371 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4372}
4373
9875201e
JD
4374static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4375{
4376 sector_t size;
9875201e
JD
4377
4378 /*
811c6688
ID
4379 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4380 * try to update its size. If REMOVING is set, updating size
4381 * is just useless work since the device can't be opened.
9875201e 4382 */
811c6688
ID
4383 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4384 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4385 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4386 dout("setting size to %llu sectors", (unsigned long long)size);
4387 set_capacity(rbd_dev->disk, size);
4388 revalidate_disk(rbd_dev->disk);
4389 }
4390}
4391
cc4a38bd 4392static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4393{
e627db08 4394 u64 mapping_size;
1fe5e993
AE
4395 int ret;
4396
cfbf6377 4397 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4398 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4399
4400 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4401 if (ret)
73e39e4d 4402 goto out;
15228ede 4403
e8f59b59
ID
4404 /*
4405 * If there is a parent, see if it has disappeared due to the
4406 * mapped image getting flattened.
4407 */
4408 if (rbd_dev->parent) {
4409 ret = rbd_dev_v2_parent_info(rbd_dev);
4410 if (ret)
73e39e4d 4411 goto out;
e8f59b59
ID
4412 }
4413
5ff1108c 4414 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4415 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4416 } else {
4417 /* validate mapped snapshot's EXISTS flag */
4418 rbd_exists_validate(rbd_dev);
4419 }
15228ede 4420
73e39e4d 4421out:
cfbf6377 4422 up_write(&rbd_dev->header_rwsem);
73e39e4d 4423 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4424 rbd_dev_update_size(rbd_dev);
1fe5e993 4425
73e39e4d 4426 return ret;
1fe5e993
AE
4427}
4428
7ad18afa
CH
4429static int rbd_init_request(void *data, struct request *rq,
4430 unsigned int hctx_idx, unsigned int request_idx,
4431 unsigned int numa_node)
4432{
4433 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4434
4435 INIT_WORK(work, rbd_queue_workfn);
4436 return 0;
4437}
4438
4439static struct blk_mq_ops rbd_mq_ops = {
4440 .queue_rq = rbd_queue_rq,
4441 .map_queue = blk_mq_map_queue,
4442 .init_request = rbd_init_request,
4443};
4444
602adf40
YS
4445static int rbd_init_disk(struct rbd_device *rbd_dev)
4446{
4447 struct gendisk *disk;
4448 struct request_queue *q;
593a9e7b 4449 u64 segment_size;
7ad18afa 4450 int err;
602adf40 4451
602adf40 4452 /* create gendisk info */
7e513d43
ID
4453 disk = alloc_disk(single_major ?
4454 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4455 RBD_MINORS_PER_MAJOR);
602adf40 4456 if (!disk)
1fcdb8aa 4457 return -ENOMEM;
602adf40 4458
f0f8cef5 4459 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4460 rbd_dev->dev_id);
602adf40 4461 disk->major = rbd_dev->major;
dd82fff1 4462 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4463 if (single_major)
4464 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4465 disk->fops = &rbd_bd_ops;
4466 disk->private_data = rbd_dev;
4467
7ad18afa
CH
4468 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4469 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4470 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4471 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4472 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4473 rbd_dev->tag_set.nr_hw_queues = 1;
4474 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4475
4476 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4477 if (err)
602adf40 4478 goto out_disk;
029bcbd8 4479
7ad18afa
CH
4480 q = blk_mq_init_queue(&rbd_dev->tag_set);
4481 if (IS_ERR(q)) {
4482 err = PTR_ERR(q);
4483 goto out_tag_set;
4484 }
4485
d8a2c89c
ID
4486 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4487 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4488
029bcbd8 4489 /* set io sizes to object size */
593a9e7b
AE
4490 segment_size = rbd_obj_bytes(&rbd_dev->header);
4491 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4492 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4493 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4494 blk_queue_max_segment_size(q, segment_size);
4495 blk_queue_io_min(q, segment_size);
4496 blk_queue_io_opt(q, segment_size);
029bcbd8 4497
90e98c52
GZ
4498 /* enable the discard support */
4499 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4500 q->limits.discard_granularity = segment_size;
4501 q->limits.discard_alignment = segment_size;
2bb4cd5c 4502 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
b76f8239 4503 q->limits.discard_zeroes_data = 1;
90e98c52 4504
bae818ee
RH
4505 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4506 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4507
602adf40
YS
4508 disk->queue = q;
4509
4510 q->queuedata = rbd_dev;
4511
4512 rbd_dev->disk = disk;
602adf40 4513
602adf40 4514 return 0;
7ad18afa
CH
4515out_tag_set:
4516 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4517out_disk:
4518 put_disk(disk);
7ad18afa 4519 return err;
602adf40
YS
4520}
4521
dfc5606d
YS
4522/*
4523 sysfs
4524*/
4525
593a9e7b
AE
4526static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4527{
4528 return container_of(dev, struct rbd_device, dev);
4529}
4530
dfc5606d
YS
4531static ssize_t rbd_size_show(struct device *dev,
4532 struct device_attribute *attr, char *buf)
4533{
593a9e7b 4534 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4535
fc71d833
AE
4536 return sprintf(buf, "%llu\n",
4537 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4538}
4539
34b13184
AE
4540/*
4541 * Note this shows the features for whatever's mapped, which is not
4542 * necessarily the base image.
4543 */
4544static ssize_t rbd_features_show(struct device *dev,
4545 struct device_attribute *attr, char *buf)
4546{
4547 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4548
4549 return sprintf(buf, "0x%016llx\n",
fc71d833 4550 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4551}
4552
dfc5606d
YS
4553static ssize_t rbd_major_show(struct device *dev,
4554 struct device_attribute *attr, char *buf)
4555{
593a9e7b 4556 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4557
fc71d833
AE
4558 if (rbd_dev->major)
4559 return sprintf(buf, "%d\n", rbd_dev->major);
4560
4561 return sprintf(buf, "(none)\n");
dd82fff1
ID
4562}
4563
4564static ssize_t rbd_minor_show(struct device *dev,
4565 struct device_attribute *attr, char *buf)
4566{
4567 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4568
dd82fff1 4569 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4570}
4571
005a07bf
ID
4572static ssize_t rbd_client_addr_show(struct device *dev,
4573 struct device_attribute *attr, char *buf)
4574{
4575 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4576 struct ceph_entity_addr *client_addr =
4577 ceph_client_addr(rbd_dev->rbd_client->client);
4578
4579 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4580 le32_to_cpu(client_addr->nonce));
4581}
4582
dfc5606d
YS
4583static ssize_t rbd_client_id_show(struct device *dev,
4584 struct device_attribute *attr, char *buf)
602adf40 4585{
593a9e7b 4586 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4587
1dbb4399 4588 return sprintf(buf, "client%lld\n",
033268a5 4589 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4590}
4591
267fb90b
MC
4592static ssize_t rbd_cluster_fsid_show(struct device *dev,
4593 struct device_attribute *attr, char *buf)
4594{
4595 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4598}
4599
0d6d1e9c
MC
4600static ssize_t rbd_config_info_show(struct device *dev,
4601 struct device_attribute *attr, char *buf)
4602{
4603 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604
4605 return sprintf(buf, "%s\n", rbd_dev->config_info);
4606}
4607
dfc5606d
YS
4608static ssize_t rbd_pool_show(struct device *dev,
4609 struct device_attribute *attr, char *buf)
602adf40 4610{
593a9e7b 4611 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4612
0d7dbfce 4613 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4614}
4615
9bb2f334
AE
4616static ssize_t rbd_pool_id_show(struct device *dev,
4617 struct device_attribute *attr, char *buf)
4618{
4619 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4620
0d7dbfce 4621 return sprintf(buf, "%llu\n",
fc71d833 4622 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4623}
4624
dfc5606d
YS
4625static ssize_t rbd_name_show(struct device *dev,
4626 struct device_attribute *attr, char *buf)
4627{
593a9e7b 4628 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4629
a92ffdf8
AE
4630 if (rbd_dev->spec->image_name)
4631 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4632
4633 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4634}
4635
589d30e0
AE
4636static ssize_t rbd_image_id_show(struct device *dev,
4637 struct device_attribute *attr, char *buf)
4638{
4639 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4640
0d7dbfce 4641 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4642}
4643
34b13184
AE
4644/*
4645 * Shows the name of the currently-mapped snapshot (or
4646 * RBD_SNAP_HEAD_NAME for the base image).
4647 */
dfc5606d
YS
4648static ssize_t rbd_snap_show(struct device *dev,
4649 struct device_attribute *attr,
4650 char *buf)
4651{
593a9e7b 4652 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4653
0d7dbfce 4654 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4655}
4656
92a58671
MC
4657static ssize_t rbd_snap_id_show(struct device *dev,
4658 struct device_attribute *attr, char *buf)
4659{
4660 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4661
4662 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4663}
4664
86b00e0d 4665/*
ff96128f
ID
4666 * For a v2 image, shows the chain of parent images, separated by empty
4667 * lines. For v1 images or if there is no parent, shows "(no parent
4668 * image)".
86b00e0d
AE
4669 */
4670static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4671 struct device_attribute *attr,
4672 char *buf)
86b00e0d
AE
4673{
4674 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4675 ssize_t count = 0;
86b00e0d 4676
ff96128f 4677 if (!rbd_dev->parent)
86b00e0d
AE
4678 return sprintf(buf, "(no parent image)\n");
4679
ff96128f
ID
4680 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4681 struct rbd_spec *spec = rbd_dev->parent_spec;
4682
4683 count += sprintf(&buf[count], "%s"
4684 "pool_id %llu\npool_name %s\n"
4685 "image_id %s\nimage_name %s\n"
4686 "snap_id %llu\nsnap_name %s\n"
4687 "overlap %llu\n",
4688 !count ? "" : "\n", /* first? */
4689 spec->pool_id, spec->pool_name,
4690 spec->image_id, spec->image_name ?: "(unknown)",
4691 spec->snap_id, spec->snap_name,
4692 rbd_dev->parent_overlap);
4693 }
4694
4695 return count;
86b00e0d
AE
4696}
4697
dfc5606d
YS
4698static ssize_t rbd_image_refresh(struct device *dev,
4699 struct device_attribute *attr,
4700 const char *buf,
4701 size_t size)
4702{
593a9e7b 4703 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4704 int ret;
602adf40 4705
cc4a38bd 4706 ret = rbd_dev_refresh(rbd_dev);
e627db08 4707 if (ret)
52bb1f9b 4708 return ret;
b813623a 4709
52bb1f9b 4710 return size;
dfc5606d 4711}
602adf40 4712
dfc5606d 4713static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4714static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4715static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4716static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4717static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4718static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4719static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4720static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4721static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4722static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4723static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4724static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4725static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4726static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4727static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4728static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4729
4730static struct attribute *rbd_attrs[] = {
4731 &dev_attr_size.attr,
34b13184 4732 &dev_attr_features.attr,
dfc5606d 4733 &dev_attr_major.attr,
dd82fff1 4734 &dev_attr_minor.attr,
005a07bf 4735 &dev_attr_client_addr.attr,
dfc5606d 4736 &dev_attr_client_id.attr,
267fb90b 4737 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4738 &dev_attr_config_info.attr,
dfc5606d 4739 &dev_attr_pool.attr,
9bb2f334 4740 &dev_attr_pool_id.attr,
dfc5606d 4741 &dev_attr_name.attr,
589d30e0 4742 &dev_attr_image_id.attr,
dfc5606d 4743 &dev_attr_current_snap.attr,
92a58671 4744 &dev_attr_snap_id.attr,
86b00e0d 4745 &dev_attr_parent.attr,
dfc5606d 4746 &dev_attr_refresh.attr,
dfc5606d
YS
4747 NULL
4748};
4749
4750static struct attribute_group rbd_attr_group = {
4751 .attrs = rbd_attrs,
4752};
4753
4754static const struct attribute_group *rbd_attr_groups[] = {
4755 &rbd_attr_group,
4756 NULL
4757};
4758
6cac4695 4759static void rbd_dev_release(struct device *dev);
dfc5606d
YS
4760
4761static struct device_type rbd_device_type = {
4762 .name = "rbd",
4763 .groups = rbd_attr_groups,
6cac4695 4764 .release = rbd_dev_release,
dfc5606d
YS
4765};
4766
8b8fb99c
AE
4767static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4768{
4769 kref_get(&spec->kref);
4770
4771 return spec;
4772}
4773
4774static void rbd_spec_free(struct kref *kref);
4775static void rbd_spec_put(struct rbd_spec *spec)
4776{
4777 if (spec)
4778 kref_put(&spec->kref, rbd_spec_free);
4779}
4780
4781static struct rbd_spec *rbd_spec_alloc(void)
4782{
4783 struct rbd_spec *spec;
4784
4785 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4786 if (!spec)
4787 return NULL;
04077599
ID
4788
4789 spec->pool_id = CEPH_NOPOOL;
4790 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4791 kref_init(&spec->kref);
4792
8b8fb99c
AE
4793 return spec;
4794}
4795
4796static void rbd_spec_free(struct kref *kref)
4797{
4798 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4799
4800 kfree(spec->pool_name);
4801 kfree(spec->image_id);
4802 kfree(spec->image_name);
4803 kfree(spec->snap_name);
4804 kfree(spec);
4805}
4806
1643dfa4 4807static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4808{
99d16943 4809 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4810 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
99d16943 4811
c41d13a3 4812 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4813 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4814 kfree(rbd_dev->config_info);
c41d13a3 4815
dd5ac32d
ID
4816 rbd_put_client(rbd_dev->rbd_client);
4817 rbd_spec_put(rbd_dev->spec);
4818 kfree(rbd_dev->opts);
4819 kfree(rbd_dev);
1643dfa4
ID
4820}
4821
4822static void rbd_dev_release(struct device *dev)
4823{
4824 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4825 bool need_put = !!rbd_dev->opts;
4826
4827 if (need_put) {
4828 destroy_workqueue(rbd_dev->task_wq);
4829 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4830 }
4831
4832 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4833
4834 /*
4835 * This is racy, but way better than putting module outside of
4836 * the release callback. The race window is pretty small, so
4837 * doing something similar to dm (dm-builtin.c) is overkill.
4838 */
4839 if (need_put)
4840 module_put(THIS_MODULE);
4841}
4842
1643dfa4
ID
4843static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4844 struct rbd_spec *spec)
c53d5893
AE
4845{
4846 struct rbd_device *rbd_dev;
4847
1643dfa4 4848 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4849 if (!rbd_dev)
4850 return NULL;
4851
4852 spin_lock_init(&rbd_dev->lock);
4853 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4854 init_rwsem(&rbd_dev->header_rwsem);
4855
c41d13a3 4856 ceph_oid_init(&rbd_dev->header_oid);
922dab61 4857 ceph_oloc_init(&rbd_dev->header_oloc);
c41d13a3 4858
99d16943
ID
4859 mutex_init(&rbd_dev->watch_mutex);
4860 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4861 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4862
ed95b21a
ID
4863 init_rwsem(&rbd_dev->lock_rwsem);
4864 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4865 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4866 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4867 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4868 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4869 init_waitqueue_head(&rbd_dev->lock_waitq);
4870
dd5ac32d
ID
4871 rbd_dev->dev.bus = &rbd_bus_type;
4872 rbd_dev->dev.type = &rbd_device_type;
4873 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4874 device_initialize(&rbd_dev->dev);
4875
c53d5893 4876 rbd_dev->rbd_client = rbdc;
d147543d 4877 rbd_dev->spec = spec;
0903e875 4878
7627151e
YZ
4879 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4880 rbd_dev->layout.stripe_count = 1;
4881 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4882 rbd_dev->layout.pool_id = spec->pool_id;
30c156d9 4883 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
0903e875 4884
1643dfa4
ID
4885 return rbd_dev;
4886}
4887
4888/*
4889 * Create a mapping rbd_dev.
4890 */
4891static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4892 struct rbd_spec *spec,
4893 struct rbd_options *opts)
4894{
4895 struct rbd_device *rbd_dev;
4896
4897 rbd_dev = __rbd_dev_create(rbdc, spec);
4898 if (!rbd_dev)
4899 return NULL;
4900
4901 rbd_dev->opts = opts;
4902
4903 /* get an id and fill in device name */
4904 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4905 minor_to_rbd_dev_id(1 << MINORBITS),
4906 GFP_KERNEL);
4907 if (rbd_dev->dev_id < 0)
4908 goto fail_rbd_dev;
4909
4910 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4911 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4912 rbd_dev->name);
4913 if (!rbd_dev->task_wq)
4914 goto fail_dev_id;
dd5ac32d 4915
1643dfa4
ID
4916 /* we have a ref from do_rbd_add() */
4917 __module_get(THIS_MODULE);
4918
4919 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4920 return rbd_dev;
1643dfa4
ID
4921
4922fail_dev_id:
4923 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4924fail_rbd_dev:
4925 rbd_dev_free(rbd_dev);
4926 return NULL;
c53d5893
AE
4927}
4928
4929static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4930{
dd5ac32d
ID
4931 if (rbd_dev)
4932 put_device(&rbd_dev->dev);
c53d5893
AE
4933}
4934
9d475de5
AE
4935/*
4936 * Get the size and object order for an image snapshot, or if
4937 * snap_id is CEPH_NOSNAP, gets this information for the base
4938 * image.
4939 */
4940static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4941 u8 *order, u64 *snap_size)
4942{
4943 __le64 snapid = cpu_to_le64(snap_id);
4944 int ret;
4945 struct {
4946 u8 order;
4947 __le64 size;
4948 } __attribute__ ((packed)) size_buf = { 0 };
4949
c41d13a3 4950 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
9d475de5 4951 "rbd", "get_size",
4157976b 4952 &snapid, sizeof (snapid),
e2a58ee5 4953 &size_buf, sizeof (size_buf));
36be9a76 4954 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4955 if (ret < 0)
4956 return ret;
57385b51
AE
4957 if (ret < sizeof (size_buf))
4958 return -ERANGE;
9d475de5 4959
c3545579 4960 if (order) {
c86f86e9 4961 *order = size_buf.order;
c3545579
JD
4962 dout(" order %u", (unsigned int)*order);
4963 }
9d475de5
AE
4964 *snap_size = le64_to_cpu(size_buf.size);
4965
c3545579
JD
4966 dout(" snap_id 0x%016llx snap_size = %llu\n",
4967 (unsigned long long)snap_id,
57385b51 4968 (unsigned long long)*snap_size);
9d475de5
AE
4969
4970 return 0;
4971}
4972
4973static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4974{
4975 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4976 &rbd_dev->header.obj_order,
4977 &rbd_dev->header.image_size);
4978}
4979
1e130199
AE
4980static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4981{
4982 void *reply_buf;
4983 int ret;
4984 void *p;
4985
4986 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4987 if (!reply_buf)
4988 return -ENOMEM;
4989
c41d13a3 4990 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 4991 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 4992 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4993 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4994 if (ret < 0)
4995 goto out;
4996
4997 p = reply_buf;
4998 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4999 p + ret, NULL, GFP_NOIO);
5000 ret = 0;
1e130199
AE
5001
5002 if (IS_ERR(rbd_dev->header.object_prefix)) {
5003 ret = PTR_ERR(rbd_dev->header.object_prefix);
5004 rbd_dev->header.object_prefix = NULL;
5005 } else {
5006 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5007 }
1e130199
AE
5008out:
5009 kfree(reply_buf);
5010
5011 return ret;
5012}
5013
b1b5402a
AE
5014static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5015 u64 *snap_features)
5016{
5017 __le64 snapid = cpu_to_le64(snap_id);
5018 struct {
5019 __le64 features;
5020 __le64 incompat;
4157976b 5021 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 5022 u64 unsup;
b1b5402a
AE
5023 int ret;
5024
c41d13a3 5025 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b1b5402a 5026 "rbd", "get_features",
4157976b 5027 &snapid, sizeof (snapid),
e2a58ee5 5028 &features_buf, sizeof (features_buf));
36be9a76 5029 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
5030 if (ret < 0)
5031 return ret;
57385b51
AE
5032 if (ret < sizeof (features_buf))
5033 return -ERANGE;
d889140c 5034
d3767f0f
ID
5035 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5036 if (unsup) {
5037 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5038 unsup);
b8f5c6ed 5039 return -ENXIO;
d3767f0f 5040 }
d889140c 5041
b1b5402a
AE
5042 *snap_features = le64_to_cpu(features_buf.features);
5043
5044 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
5045 (unsigned long long)snap_id,
5046 (unsigned long long)*snap_features,
5047 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
5048
5049 return 0;
5050}
5051
5052static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5053{
5054 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5055 &rbd_dev->header.features);
5056}
5057
86b00e0d
AE
5058static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5059{
5060 struct rbd_spec *parent_spec;
5061 size_t size;
5062 void *reply_buf = NULL;
5063 __le64 snapid;
5064 void *p;
5065 void *end;
642a2537 5066 u64 pool_id;
86b00e0d 5067 char *image_id;
3b5cf2a2 5068 u64 snap_id;
86b00e0d 5069 u64 overlap;
86b00e0d
AE
5070 int ret;
5071
5072 parent_spec = rbd_spec_alloc();
5073 if (!parent_spec)
5074 return -ENOMEM;
5075
5076 size = sizeof (__le64) + /* pool_id */
5077 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5078 sizeof (__le64) + /* snap_id */
5079 sizeof (__le64); /* overlap */
5080 reply_buf = kmalloc(size, GFP_KERNEL);
5081 if (!reply_buf) {
5082 ret = -ENOMEM;
5083 goto out_err;
5084 }
5085
4d9b67cd 5086 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
c41d13a3 5087 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
86b00e0d 5088 "rbd", "get_parent",
4157976b 5089 &snapid, sizeof (snapid),
e2a58ee5 5090 reply_buf, size);
36be9a76 5091 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5092 if (ret < 0)
5093 goto out_err;
5094
86b00e0d 5095 p = reply_buf;
57385b51
AE
5096 end = reply_buf + ret;
5097 ret = -ERANGE;
642a2537 5098 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5099 if (pool_id == CEPH_NOPOOL) {
5100 /*
5101 * Either the parent never existed, or we have
5102 * record of it but the image got flattened so it no
5103 * longer has a parent. When the parent of a
5104 * layered image disappears we immediately set the
5105 * overlap to 0. The effect of this is that all new
5106 * requests will be treated as if the image had no
5107 * parent.
5108 */
5109 if (rbd_dev->parent_overlap) {
5110 rbd_dev->parent_overlap = 0;
392a9dad
AE
5111 rbd_dev_parent_put(rbd_dev);
5112 pr_info("%s: clone image has been flattened\n",
5113 rbd_dev->disk->disk_name);
5114 }
5115
86b00e0d 5116 goto out; /* No parent? No problem. */
392a9dad 5117 }
86b00e0d 5118
0903e875
AE
5119 /* The ceph file layout needs to fit pool id in 32 bits */
5120
5121 ret = -EIO;
642a2537 5122 if (pool_id > (u64)U32_MAX) {
9584d508 5123 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5124 (unsigned long long)pool_id, U32_MAX);
57385b51 5125 goto out_err;
c0cd10db 5126 }
0903e875 5127
979ed480 5128 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5129 if (IS_ERR(image_id)) {
5130 ret = PTR_ERR(image_id);
5131 goto out_err;
5132 }
3b5cf2a2 5133 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5134 ceph_decode_64_safe(&p, end, overlap, out_err);
5135
3b5cf2a2
AE
5136 /*
5137 * The parent won't change (except when the clone is
5138 * flattened, already handled that). So we only need to
5139 * record the parent spec we have not already done so.
5140 */
5141 if (!rbd_dev->parent_spec) {
5142 parent_spec->pool_id = pool_id;
5143 parent_spec->image_id = image_id;
5144 parent_spec->snap_id = snap_id;
70cf49cf
AE
5145 rbd_dev->parent_spec = parent_spec;
5146 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5147 } else {
5148 kfree(image_id);
3b5cf2a2
AE
5149 }
5150
5151 /*
cf32bd9c
ID
5152 * We always update the parent overlap. If it's zero we issue
5153 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5154 */
3b5cf2a2 5155 if (!overlap) {
3b5cf2a2 5156 if (parent_spec) {
cf32bd9c
ID
5157 /* refresh, careful to warn just once */
5158 if (rbd_dev->parent_overlap)
5159 rbd_warn(rbd_dev,
5160 "clone now standalone (overlap became 0)");
3b5cf2a2 5161 } else {
cf32bd9c
ID
5162 /* initial probe */
5163 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5164 }
70cf49cf 5165 }
cf32bd9c
ID
5166 rbd_dev->parent_overlap = overlap;
5167
86b00e0d
AE
5168out:
5169 ret = 0;
5170out_err:
5171 kfree(reply_buf);
5172 rbd_spec_put(parent_spec);
5173
5174 return ret;
5175}
5176
cc070d59
AE
5177static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5178{
5179 struct {
5180 __le64 stripe_unit;
5181 __le64 stripe_count;
5182 } __attribute__ ((packed)) striping_info_buf = { 0 };
5183 size_t size = sizeof (striping_info_buf);
5184 void *p;
5185 u64 obj_size;
5186 u64 stripe_unit;
5187 u64 stripe_count;
5188 int ret;
5189
c41d13a3 5190 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
cc070d59 5191 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 5192 (char *)&striping_info_buf, size);
cc070d59
AE
5193 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5194 if (ret < 0)
5195 return ret;
5196 if (ret < size)
5197 return -ERANGE;
5198
5199 /*
5200 * We don't actually support the "fancy striping" feature
5201 * (STRIPINGV2) yet, but if the striping sizes are the
5202 * defaults the behavior is the same as before. So find
5203 * out, and only fail if the image has non-default values.
5204 */
5205 ret = -EINVAL;
5206 obj_size = (u64)1 << rbd_dev->header.obj_order;
5207 p = &striping_info_buf;
5208 stripe_unit = ceph_decode_64(&p);
5209 if (stripe_unit != obj_size) {
5210 rbd_warn(rbd_dev, "unsupported stripe unit "
5211 "(got %llu want %llu)",
5212 stripe_unit, obj_size);
5213 return -EINVAL;
5214 }
5215 stripe_count = ceph_decode_64(&p);
5216 if (stripe_count != 1) {
5217 rbd_warn(rbd_dev, "unsupported stripe count "
5218 "(got %llu want 1)", stripe_count);
5219 return -EINVAL;
5220 }
500d0c0f
AE
5221 rbd_dev->header.stripe_unit = stripe_unit;
5222 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5223
5224 return 0;
5225}
5226
9e15b77d
AE
5227static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5228{
5229 size_t image_id_size;
5230 char *image_id;
5231 void *p;
5232 void *end;
5233 size_t size;
5234 void *reply_buf = NULL;
5235 size_t len = 0;
5236 char *image_name = NULL;
5237 int ret;
5238
5239 rbd_assert(!rbd_dev->spec->image_name);
5240
69e7a02f
AE
5241 len = strlen(rbd_dev->spec->image_id);
5242 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5243 image_id = kmalloc(image_id_size, GFP_KERNEL);
5244 if (!image_id)
5245 return NULL;
5246
5247 p = image_id;
4157976b 5248 end = image_id + image_id_size;
57385b51 5249 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5250
5251 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5252 reply_buf = kmalloc(size, GFP_KERNEL);
5253 if (!reply_buf)
5254 goto out;
5255
36be9a76 5256 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
5257 "rbd", "dir_get_name",
5258 image_id, image_id_size,
e2a58ee5 5259 reply_buf, size);
9e15b77d
AE
5260 if (ret < 0)
5261 goto out;
5262 p = reply_buf;
f40eb349
AE
5263 end = reply_buf + ret;
5264
9e15b77d
AE
5265 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5266 if (IS_ERR(image_name))
5267 image_name = NULL;
5268 else
5269 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5270out:
5271 kfree(reply_buf);
5272 kfree(image_id);
5273
5274 return image_name;
5275}
5276
2ad3d716
AE
5277static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5278{
5279 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5280 const char *snap_name;
5281 u32 which = 0;
5282
5283 /* Skip over names until we find the one we are looking for */
5284
5285 snap_name = rbd_dev->header.snap_names;
5286 while (which < snapc->num_snaps) {
5287 if (!strcmp(name, snap_name))
5288 return snapc->snaps[which];
5289 snap_name += strlen(snap_name) + 1;
5290 which++;
5291 }
5292 return CEPH_NOSNAP;
5293}
5294
5295static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5296{
5297 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5298 u32 which;
5299 bool found = false;
5300 u64 snap_id;
5301
5302 for (which = 0; !found && which < snapc->num_snaps; which++) {
5303 const char *snap_name;
5304
5305 snap_id = snapc->snaps[which];
5306 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5307 if (IS_ERR(snap_name)) {
5308 /* ignore no-longer existing snapshots */
5309 if (PTR_ERR(snap_name) == -ENOENT)
5310 continue;
5311 else
5312 break;
5313 }
2ad3d716
AE
5314 found = !strcmp(name, snap_name);
5315 kfree(snap_name);
5316 }
5317 return found ? snap_id : CEPH_NOSNAP;
5318}
5319
5320/*
5321 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5322 * no snapshot by that name is found, or if an error occurs.
5323 */
5324static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5325{
5326 if (rbd_dev->image_format == 1)
5327 return rbd_v1_snap_id_by_name(rbd_dev, name);
5328
5329 return rbd_v2_snap_id_by_name(rbd_dev, name);
5330}
5331
9e15b77d 5332/*
04077599
ID
5333 * An image being mapped will have everything but the snap id.
5334 */
5335static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5336{
5337 struct rbd_spec *spec = rbd_dev->spec;
5338
5339 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5340 rbd_assert(spec->image_id && spec->image_name);
5341 rbd_assert(spec->snap_name);
5342
5343 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5344 u64 snap_id;
5345
5346 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5347 if (snap_id == CEPH_NOSNAP)
5348 return -ENOENT;
5349
5350 spec->snap_id = snap_id;
5351 } else {
5352 spec->snap_id = CEPH_NOSNAP;
5353 }
5354
5355 return 0;
5356}
5357
5358/*
5359 * A parent image will have all ids but none of the names.
e1d4213f 5360 *
04077599
ID
5361 * All names in an rbd spec are dynamically allocated. It's OK if we
5362 * can't figure out the name for an image id.
9e15b77d 5363 */
04077599 5364static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5365{
2e9f7f1c
AE
5366 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5367 struct rbd_spec *spec = rbd_dev->spec;
5368 const char *pool_name;
5369 const char *image_name;
5370 const char *snap_name;
9e15b77d
AE
5371 int ret;
5372
04077599
ID
5373 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5374 rbd_assert(spec->image_id);
5375 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5376
2e9f7f1c 5377 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5378
2e9f7f1c
AE
5379 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5380 if (!pool_name) {
5381 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5382 return -EIO;
5383 }
2e9f7f1c
AE
5384 pool_name = kstrdup(pool_name, GFP_KERNEL);
5385 if (!pool_name)
9e15b77d
AE
5386 return -ENOMEM;
5387
5388 /* Fetch the image name; tolerate failure here */
5389
2e9f7f1c
AE
5390 image_name = rbd_dev_image_name(rbd_dev);
5391 if (!image_name)
06ecc6cb 5392 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5393
04077599 5394 /* Fetch the snapshot name */
9e15b77d 5395
2e9f7f1c 5396 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5397 if (IS_ERR(snap_name)) {
5398 ret = PTR_ERR(snap_name);
9e15b77d 5399 goto out_err;
2e9f7f1c
AE
5400 }
5401
5402 spec->pool_name = pool_name;
5403 spec->image_name = image_name;
5404 spec->snap_name = snap_name;
9e15b77d
AE
5405
5406 return 0;
04077599 5407
9e15b77d 5408out_err:
2e9f7f1c
AE
5409 kfree(image_name);
5410 kfree(pool_name);
9e15b77d
AE
5411 return ret;
5412}
5413
cc4a38bd 5414static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5415{
5416 size_t size;
5417 int ret;
5418 void *reply_buf;
5419 void *p;
5420 void *end;
5421 u64 seq;
5422 u32 snap_count;
5423 struct ceph_snap_context *snapc;
5424 u32 i;
5425
5426 /*
5427 * We'll need room for the seq value (maximum snapshot id),
5428 * snapshot count, and array of that many snapshot ids.
5429 * For now we have a fixed upper limit on the number we're
5430 * prepared to receive.
5431 */
5432 size = sizeof (__le64) + sizeof (__le32) +
5433 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5434 reply_buf = kzalloc(size, GFP_KERNEL);
5435 if (!reply_buf)
5436 return -ENOMEM;
5437
c41d13a3 5438 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 5439 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 5440 reply_buf, size);
36be9a76 5441 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5442 if (ret < 0)
5443 goto out;
5444
35d489f9 5445 p = reply_buf;
57385b51
AE
5446 end = reply_buf + ret;
5447 ret = -ERANGE;
35d489f9
AE
5448 ceph_decode_64_safe(&p, end, seq, out);
5449 ceph_decode_32_safe(&p, end, snap_count, out);
5450
5451 /*
5452 * Make sure the reported number of snapshot ids wouldn't go
5453 * beyond the end of our buffer. But before checking that,
5454 * make sure the computed size of the snapshot context we
5455 * allocate is representable in a size_t.
5456 */
5457 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5458 / sizeof (u64)) {
5459 ret = -EINVAL;
5460 goto out;
5461 }
5462 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5463 goto out;
468521c1 5464 ret = 0;
35d489f9 5465
812164f8 5466 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5467 if (!snapc) {
5468 ret = -ENOMEM;
5469 goto out;
5470 }
35d489f9 5471 snapc->seq = seq;
35d489f9
AE
5472 for (i = 0; i < snap_count; i++)
5473 snapc->snaps[i] = ceph_decode_64(&p);
5474
49ece554 5475 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5476 rbd_dev->header.snapc = snapc;
5477
5478 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5479 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5480out:
5481 kfree(reply_buf);
5482
57385b51 5483 return ret;
35d489f9
AE
5484}
5485
54cac61f
AE
5486static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5487 u64 snap_id)
b8b1e2db
AE
5488{
5489 size_t size;
5490 void *reply_buf;
54cac61f 5491 __le64 snapid;
b8b1e2db
AE
5492 int ret;
5493 void *p;
5494 void *end;
b8b1e2db
AE
5495 char *snap_name;
5496
5497 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5498 reply_buf = kmalloc(size, GFP_KERNEL);
5499 if (!reply_buf)
5500 return ERR_PTR(-ENOMEM);
5501
54cac61f 5502 snapid = cpu_to_le64(snap_id);
c41d13a3 5503 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b8b1e2db 5504 "rbd", "get_snapshot_name",
54cac61f 5505 &snapid, sizeof (snapid),
e2a58ee5 5506 reply_buf, size);
36be9a76 5507 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5508 if (ret < 0) {
5509 snap_name = ERR_PTR(ret);
b8b1e2db 5510 goto out;
f40eb349 5511 }
b8b1e2db
AE
5512
5513 p = reply_buf;
f40eb349 5514 end = reply_buf + ret;
e5c35534 5515 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5516 if (IS_ERR(snap_name))
b8b1e2db 5517 goto out;
b8b1e2db 5518
f40eb349 5519 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5520 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5521out:
5522 kfree(reply_buf);
5523
f40eb349 5524 return snap_name;
b8b1e2db
AE
5525}
5526
2df3fac7 5527static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5528{
2df3fac7 5529 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5530 int ret;
117973fb 5531
1617e40c
JD
5532 ret = rbd_dev_v2_image_size(rbd_dev);
5533 if (ret)
cfbf6377 5534 return ret;
1617e40c 5535
2df3fac7
AE
5536 if (first_time) {
5537 ret = rbd_dev_v2_header_onetime(rbd_dev);
5538 if (ret)
cfbf6377 5539 return ret;
2df3fac7
AE
5540 }
5541
cc4a38bd 5542 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5543 if (ret && first_time) {
5544 kfree(rbd_dev->header.object_prefix);
5545 rbd_dev->header.object_prefix = NULL;
5546 }
117973fb
AE
5547
5548 return ret;
5549}
5550
a720ae09
ID
5551static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5552{
5553 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5554
5555 if (rbd_dev->image_format == 1)
5556 return rbd_dev_v1_header_info(rbd_dev);
5557
5558 return rbd_dev_v2_header_info(rbd_dev);
5559}
5560
e28fff26
AE
5561/*
5562 * Skips over white space at *buf, and updates *buf to point to the
5563 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5564 * the token (string of non-white space characters) found. Note
5565 * that *buf must be terminated with '\0'.
e28fff26
AE
5566 */
5567static inline size_t next_token(const char **buf)
5568{
5569 /*
5570 * These are the characters that produce nonzero for
5571 * isspace() in the "C" and "POSIX" locales.
5572 */
5573 const char *spaces = " \f\n\r\t\v";
5574
5575 *buf += strspn(*buf, spaces); /* Find start of token */
5576
5577 return strcspn(*buf, spaces); /* Return token length */
5578}
5579
ea3352f4
AE
5580/*
5581 * Finds the next token in *buf, dynamically allocates a buffer big
5582 * enough to hold a copy of it, and copies the token into the new
5583 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5584 * that a duplicate buffer is created even for a zero-length token.
5585 *
5586 * Returns a pointer to the newly-allocated duplicate, or a null
5587 * pointer if memory for the duplicate was not available. If
5588 * the lenp argument is a non-null pointer, the length of the token
5589 * (not including the '\0') is returned in *lenp.
5590 *
5591 * If successful, the *buf pointer will be updated to point beyond
5592 * the end of the found token.
5593 *
5594 * Note: uses GFP_KERNEL for allocation.
5595 */
5596static inline char *dup_token(const char **buf, size_t *lenp)
5597{
5598 char *dup;
5599 size_t len;
5600
5601 len = next_token(buf);
4caf35f9 5602 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5603 if (!dup)
5604 return NULL;
ea3352f4
AE
5605 *(dup + len) = '\0';
5606 *buf += len;
5607
5608 if (lenp)
5609 *lenp = len;
5610
5611 return dup;
5612}
5613
a725f65e 5614/*
859c31df
AE
5615 * Parse the options provided for an "rbd add" (i.e., rbd image
5616 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5617 * and the data written is passed here via a NUL-terminated buffer.
5618 * Returns 0 if successful or an error code otherwise.
d22f76e7 5619 *
859c31df
AE
5620 * The information extracted from these options is recorded in
5621 * the other parameters which return dynamically-allocated
5622 * structures:
5623 * ceph_opts
5624 * The address of a pointer that will refer to a ceph options
5625 * structure. Caller must release the returned pointer using
5626 * ceph_destroy_options() when it is no longer needed.
5627 * rbd_opts
5628 * Address of an rbd options pointer. Fully initialized by
5629 * this function; caller must release with kfree().
5630 * spec
5631 * Address of an rbd image specification pointer. Fully
5632 * initialized by this function based on parsed options.
5633 * Caller must release with rbd_spec_put().
5634 *
5635 * The options passed take this form:
5636 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5637 * where:
5638 * <mon_addrs>
5639 * A comma-separated list of one or more monitor addresses.
5640 * A monitor address is an ip address, optionally followed
5641 * by a port number (separated by a colon).
5642 * I.e.: ip1[:port1][,ip2[:port2]...]
5643 * <options>
5644 * A comma-separated list of ceph and/or rbd options.
5645 * <pool_name>
5646 * The name of the rados pool containing the rbd image.
5647 * <image_name>
5648 * The name of the image in that pool to map.
5649 * <snap_id>
5650 * An optional snapshot id. If provided, the mapping will
5651 * present data from the image at the time that snapshot was
5652 * created. The image head is used if no snapshot id is
5653 * provided. Snapshot mappings are always read-only.
a725f65e 5654 */
859c31df 5655static int rbd_add_parse_args(const char *buf,
dc79b113 5656 struct ceph_options **ceph_opts,
859c31df
AE
5657 struct rbd_options **opts,
5658 struct rbd_spec **rbd_spec)
e28fff26 5659{
d22f76e7 5660 size_t len;
859c31df 5661 char *options;
0ddebc0c 5662 const char *mon_addrs;
ecb4dc22 5663 char *snap_name;
0ddebc0c 5664 size_t mon_addrs_size;
859c31df 5665 struct rbd_spec *spec = NULL;
4e9afeba 5666 struct rbd_options *rbd_opts = NULL;
859c31df 5667 struct ceph_options *copts;
dc79b113 5668 int ret;
e28fff26
AE
5669
5670 /* The first four tokens are required */
5671
7ef3214a 5672 len = next_token(&buf);
4fb5d671
AE
5673 if (!len) {
5674 rbd_warn(NULL, "no monitor address(es) provided");
5675 return -EINVAL;
5676 }
0ddebc0c 5677 mon_addrs = buf;
f28e565a 5678 mon_addrs_size = len + 1;
7ef3214a 5679 buf += len;
a725f65e 5680
dc79b113 5681 ret = -EINVAL;
f28e565a
AE
5682 options = dup_token(&buf, NULL);
5683 if (!options)
dc79b113 5684 return -ENOMEM;
4fb5d671
AE
5685 if (!*options) {
5686 rbd_warn(NULL, "no options provided");
5687 goto out_err;
5688 }
e28fff26 5689
859c31df
AE
5690 spec = rbd_spec_alloc();
5691 if (!spec)
f28e565a 5692 goto out_mem;
859c31df
AE
5693
5694 spec->pool_name = dup_token(&buf, NULL);
5695 if (!spec->pool_name)
5696 goto out_mem;
4fb5d671
AE
5697 if (!*spec->pool_name) {
5698 rbd_warn(NULL, "no pool name provided");
5699 goto out_err;
5700 }
e28fff26 5701
69e7a02f 5702 spec->image_name = dup_token(&buf, NULL);
859c31df 5703 if (!spec->image_name)
f28e565a 5704 goto out_mem;
4fb5d671
AE
5705 if (!*spec->image_name) {
5706 rbd_warn(NULL, "no image name provided");
5707 goto out_err;
5708 }
d4b125e9 5709
f28e565a
AE
5710 /*
5711 * Snapshot name is optional; default is to use "-"
5712 * (indicating the head/no snapshot).
5713 */
3feeb894 5714 len = next_token(&buf);
820a5f3e 5715 if (!len) {
3feeb894
AE
5716 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5717 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5718 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5719 ret = -ENAMETOOLONG;
f28e565a 5720 goto out_err;
849b4260 5721 }
ecb4dc22
AE
5722 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5723 if (!snap_name)
f28e565a 5724 goto out_mem;
ecb4dc22
AE
5725 *(snap_name + len) = '\0';
5726 spec->snap_name = snap_name;
e5c35534 5727
0ddebc0c 5728 /* Initialize all rbd options to the defaults */
e28fff26 5729
4e9afeba
AE
5730 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5731 if (!rbd_opts)
5732 goto out_mem;
5733
5734 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5735 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5736 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
d22f76e7 5737
859c31df 5738 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5739 mon_addrs + mon_addrs_size - 1,
4e9afeba 5740 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5741 if (IS_ERR(copts)) {
5742 ret = PTR_ERR(copts);
dc79b113
AE
5743 goto out_err;
5744 }
859c31df
AE
5745 kfree(options);
5746
5747 *ceph_opts = copts;
4e9afeba 5748 *opts = rbd_opts;
859c31df 5749 *rbd_spec = spec;
0ddebc0c 5750
dc79b113 5751 return 0;
f28e565a 5752out_mem:
dc79b113 5753 ret = -ENOMEM;
d22f76e7 5754out_err:
859c31df
AE
5755 kfree(rbd_opts);
5756 rbd_spec_put(spec);
f28e565a 5757 kfree(options);
d22f76e7 5758
dc79b113 5759 return ret;
a725f65e
AE
5760}
5761
30ba1f02
ID
5762/*
5763 * Return pool id (>= 0) or a negative error code.
5764 */
5765static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5766{
a319bf56 5767 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5768 u64 newest_epoch;
30ba1f02
ID
5769 int tries = 0;
5770 int ret;
5771
5772again:
5773 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5774 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5775 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5776 &newest_epoch);
30ba1f02
ID
5777 if (ret < 0)
5778 return ret;
5779
5780 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5781 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5782 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5783 newest_epoch,
5784 opts->mount_timeout);
30ba1f02
ID
5785 goto again;
5786 } else {
5787 /* the osdmap we have is new enough */
5788 return -ENOENT;
5789 }
5790 }
5791
5792 return ret;
5793}
5794
589d30e0
AE
5795/*
5796 * An rbd format 2 image has a unique identifier, distinct from the
5797 * name given to it by the user. Internally, that identifier is
5798 * what's used to specify the names of objects related to the image.
5799 *
5800 * A special "rbd id" object is used to map an rbd image name to its
5801 * id. If that object doesn't exist, then there is no v2 rbd image
5802 * with the supplied name.
5803 *
5804 * This function will record the given rbd_dev's image_id field if
5805 * it can be determined, and in that case will return 0. If any
5806 * errors occur a negative errno will be returned and the rbd_dev's
5807 * image_id field will be unchanged (and should be NULL).
5808 */
5809static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5810{
5811 int ret;
5812 size_t size;
5813 char *object_name;
5814 void *response;
c0fba368 5815 char *image_id;
2f82ee54 5816
2c0d0a10
AE
5817 /*
5818 * When probing a parent image, the image id is already
5819 * known (and the image name likely is not). There's no
c0fba368
AE
5820 * need to fetch the image id again in this case. We
5821 * do still need to set the image format though.
2c0d0a10 5822 */
c0fba368
AE
5823 if (rbd_dev->spec->image_id) {
5824 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5825
2c0d0a10 5826 return 0;
c0fba368 5827 }
2c0d0a10 5828
589d30e0
AE
5829 /*
5830 * First, see if the format 2 image id file exists, and if
5831 * so, get the image's persistent id from it.
5832 */
69e7a02f 5833 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
5834 object_name = kmalloc(size, GFP_NOIO);
5835 if (!object_name)
5836 return -ENOMEM;
0d7dbfce 5837 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
5838 dout("rbd id object name is %s\n", object_name);
5839
5840 /* Response will be an encoded string, which includes a length */
5841
5842 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5843 response = kzalloc(size, GFP_NOIO);
5844 if (!response) {
5845 ret = -ENOMEM;
5846 goto out;
5847 }
5848
c0fba368
AE
5849 /* If it doesn't exist we'll assume it's a format 1 image */
5850
36be9a76 5851 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 5852 "rbd", "get_id", NULL, 0,
e2a58ee5 5853 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5854 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5855 if (ret == -ENOENT) {
5856 image_id = kstrdup("", GFP_KERNEL);
5857 ret = image_id ? 0 : -ENOMEM;
5858 if (!ret)
5859 rbd_dev->image_format = 1;
7dd440c9 5860 } else if (ret >= 0) {
c0fba368
AE
5861 void *p = response;
5862
5863 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5864 NULL, GFP_NOIO);
461f758a 5865 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5866 if (!ret)
5867 rbd_dev->image_format = 2;
c0fba368
AE
5868 }
5869
5870 if (!ret) {
5871 rbd_dev->spec->image_id = image_id;
5872 dout("image_id is %s\n", image_id);
589d30e0
AE
5873 }
5874out:
5875 kfree(response);
5876 kfree(object_name);
5877
5878 return ret;
5879}
5880
3abef3b3
AE
5881/*
5882 * Undo whatever state changes are made by v1 or v2 header info
5883 * call.
5884 */
6fd48b3b
AE
5885static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5886{
5887 struct rbd_image_header *header;
5888
e69b8d41 5889 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5890
5891 /* Free dynamic fields from the header, then zero it out */
5892
5893 header = &rbd_dev->header;
812164f8 5894 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5895 kfree(header->snap_sizes);
5896 kfree(header->snap_names);
5897 kfree(header->object_prefix);
5898 memset(header, 0, sizeof (*header));
5899}
5900
2df3fac7 5901static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5902{
5903 int ret;
a30b71b9 5904
1e130199 5905 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5906 if (ret)
b1b5402a
AE
5907 goto out_err;
5908
2df3fac7
AE
5909 /*
5910 * Get the and check features for the image. Currently the
5911 * features are assumed to never change.
5912 */
b1b5402a 5913 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5914 if (ret)
9d475de5 5915 goto out_err;
35d489f9 5916
cc070d59
AE
5917 /* If the image supports fancy striping, get its parameters */
5918
5919 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5920 ret = rbd_dev_v2_striping_info(rbd_dev);
5921 if (ret < 0)
5922 goto out_err;
5923 }
2df3fac7 5924 /* No support for crypto and compression type format 2 images */
a30b71b9 5925
35152979 5926 return 0;
9d475de5 5927out_err:
642a2537 5928 rbd_dev->header.features = 0;
1e130199
AE
5929 kfree(rbd_dev->header.object_prefix);
5930 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
5931
5932 return ret;
a30b71b9
AE
5933}
5934
6d69bb53
ID
5935/*
5936 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5937 * rbd_dev_image_probe() recursion depth, which means it's also the
5938 * length of the already discovered part of the parent chain.
5939 */
5940static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5941{
2f82ee54 5942 struct rbd_device *parent = NULL;
124afba2
AE
5943 int ret;
5944
5945 if (!rbd_dev->parent_spec)
5946 return 0;
124afba2 5947
6d69bb53
ID
5948 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5949 pr_info("parent chain is too long (%d)\n", depth);
5950 ret = -EINVAL;
5951 goto out_err;
5952 }
5953
1643dfa4 5954 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5955 if (!parent) {
5956 ret = -ENOMEM;
124afba2 5957 goto out_err;
1f2c6651
ID
5958 }
5959
5960 /*
5961 * Images related by parent/child relationships always share
5962 * rbd_client and spec/parent_spec, so bump their refcounts.
5963 */
5964 __rbd_get_client(rbd_dev->rbd_client);
5965 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5966
6d69bb53 5967 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5968 if (ret < 0)
5969 goto out_err;
1f2c6651 5970
124afba2 5971 rbd_dev->parent = parent;
a2acd00e 5972 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5973 return 0;
1f2c6651 5974
124afba2 5975out_err:
1f2c6651 5976 rbd_dev_unparent(rbd_dev);
1761b229 5977 rbd_dev_destroy(parent);
124afba2
AE
5978 return ret;
5979}
5980
811c6688
ID
5981/*
5982 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5983 * upon return.
5984 */
200a6a8b 5985static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5986{
83a06263 5987 int ret;
d1cf5788 5988
9b60e70b 5989 /* Record our major and minor device numbers. */
83a06263 5990
9b60e70b
ID
5991 if (!single_major) {
5992 ret = register_blkdev(0, rbd_dev->name);
5993 if (ret < 0)
1643dfa4 5994 goto err_out_unlock;
9b60e70b
ID
5995
5996 rbd_dev->major = ret;
5997 rbd_dev->minor = 0;
5998 } else {
5999 rbd_dev->major = rbd_major;
6000 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6001 }
83a06263
AE
6002
6003 /* Set up the blkdev mapping. */
6004
6005 ret = rbd_init_disk(rbd_dev);
6006 if (ret)
6007 goto err_out_blkdev;
6008
f35a4dee 6009 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
6010 if (ret)
6011 goto err_out_disk;
bc1ecc65 6012
f35a4dee 6013 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 6014 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 6015
dd5ac32d
ID
6016 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6017 ret = device_add(&rbd_dev->dev);
f35a4dee 6018 if (ret)
f5ee37bd 6019 goto err_out_mapping;
83a06263 6020
83a06263
AE
6021 /* Everything's ready. Announce the disk to the world. */
6022
129b79d4 6023 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 6024 up_write(&rbd_dev->header_rwsem);
83a06263 6025
1643dfa4
ID
6026 spin_lock(&rbd_dev_list_lock);
6027 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6028 spin_unlock(&rbd_dev_list_lock);
6029
811c6688 6030 add_disk(rbd_dev->disk);
ca7909e8
ID
6031 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6032 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6033 rbd_dev->header.features);
83a06263
AE
6034
6035 return ret;
2f82ee54 6036
f35a4dee
AE
6037err_out_mapping:
6038 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
6039err_out_disk:
6040 rbd_free_disk(rbd_dev);
6041err_out_blkdev:
9b60e70b
ID
6042 if (!single_major)
6043 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
6044err_out_unlock:
6045 up_write(&rbd_dev->header_rwsem);
83a06263
AE
6046 return ret;
6047}
6048
332bb12d
AE
6049static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6050{
6051 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 6052 int ret;
332bb12d
AE
6053
6054 /* Record the header object name for this rbd image. */
6055
6056 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6057
7627151e 6058 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
332bb12d 6059 if (rbd_dev->image_format == 1)
c41d13a3
ID
6060 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6061 spec->image_name, RBD_SUFFIX);
332bb12d 6062 else
c41d13a3
ID
6063 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6064 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 6065
c41d13a3 6066 return ret;
332bb12d
AE
6067}
6068
200a6a8b
AE
6069static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6070{
6fd48b3b 6071 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
6072 rbd_dev->image_format = 0;
6073 kfree(rbd_dev->spec->image_id);
6074 rbd_dev->spec->image_id = NULL;
6075
200a6a8b
AE
6076 rbd_dev_destroy(rbd_dev);
6077}
6078
a30b71b9
AE
6079/*
6080 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6081 * device. If this image is the one being mapped (i.e., not a
6082 * parent), initiate a watch on its header object before using that
6083 * object to get detailed information about the rbd image.
a30b71b9 6084 */
6d69bb53 6085static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6086{
6087 int ret;
6088
6089 /*
3abef3b3
AE
6090 * Get the id from the image id object. Unless there's an
6091 * error, rbd_dev->spec->image_id will be filled in with
6092 * a dynamically-allocated string, and rbd_dev->image_format
6093 * will be set to either 1 or 2.
a30b71b9
AE
6094 */
6095 ret = rbd_dev_image_id(rbd_dev);
6096 if (ret)
c0fba368 6097 return ret;
c0fba368 6098
332bb12d
AE
6099 ret = rbd_dev_header_name(rbd_dev);
6100 if (ret)
6101 goto err_out_format;
6102
6d69bb53 6103 if (!depth) {
99d16943 6104 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6105 if (ret) {
6106 if (ret == -ENOENT)
6107 pr_info("image %s/%s does not exist\n",
6108 rbd_dev->spec->pool_name,
6109 rbd_dev->spec->image_name);
c41d13a3 6110 goto err_out_format;
1fe48023 6111 }
1f3ef788 6112 }
b644de2b 6113
a720ae09 6114 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6115 if (ret)
b644de2b 6116 goto err_out_watch;
83a06263 6117
04077599
ID
6118 /*
6119 * If this image is the one being mapped, we have pool name and
6120 * id, image name and id, and snap name - need to fill snap id.
6121 * Otherwise this is a parent image, identified by pool, image
6122 * and snap ids - need to fill in names for those ids.
6123 */
6d69bb53 6124 if (!depth)
04077599
ID
6125 ret = rbd_spec_fill_snap_id(rbd_dev);
6126 else
6127 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6128 if (ret) {
6129 if (ret == -ENOENT)
6130 pr_info("snap %s/%s@%s does not exist\n",
6131 rbd_dev->spec->pool_name,
6132 rbd_dev->spec->image_name,
6133 rbd_dev->spec->snap_name);
33dca39f 6134 goto err_out_probe;
1fe48023 6135 }
9bb81c9b 6136
e8f59b59
ID
6137 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6138 ret = rbd_dev_v2_parent_info(rbd_dev);
6139 if (ret)
6140 goto err_out_probe;
6141
6142 /*
6143 * Need to warn users if this image is the one being
6144 * mapped and has a parent.
6145 */
6d69bb53 6146 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6147 rbd_warn(rbd_dev,
6148 "WARNING: kernel layering is EXPERIMENTAL!");
6149 }
6150
6d69bb53 6151 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6152 if (ret)
6153 goto err_out_probe;
6154
6155 dout("discovered format %u image, header name is %s\n",
c41d13a3 6156 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6157 return 0;
e8f59b59 6158
6fd48b3b
AE
6159err_out_probe:
6160 rbd_dev_unprobe(rbd_dev);
b644de2b 6161err_out_watch:
6d69bb53 6162 if (!depth)
99d16943 6163 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6164err_out_format:
6165 rbd_dev->image_format = 0;
5655c4d9
AE
6166 kfree(rbd_dev->spec->image_id);
6167 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6168 return ret;
6169}
6170
9b60e70b
ID
6171static ssize_t do_rbd_add(struct bus_type *bus,
6172 const char *buf,
6173 size_t count)
602adf40 6174{
cb8627c7 6175 struct rbd_device *rbd_dev = NULL;
dc79b113 6176 struct ceph_options *ceph_opts = NULL;
4e9afeba 6177 struct rbd_options *rbd_opts = NULL;
859c31df 6178 struct rbd_spec *spec = NULL;
9d3997fd 6179 struct rbd_client *rbdc;
51344a38 6180 bool read_only;
b51c83c2 6181 int rc;
602adf40
YS
6182
6183 if (!try_module_get(THIS_MODULE))
6184 return -ENODEV;
6185
602adf40 6186 /* parse add command */
859c31df 6187 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6188 if (rc < 0)
dd5ac32d 6189 goto out;
78cea76e 6190
9d3997fd
AE
6191 rbdc = rbd_get_client(ceph_opts);
6192 if (IS_ERR(rbdc)) {
6193 rc = PTR_ERR(rbdc);
0ddebc0c 6194 goto err_out_args;
9d3997fd 6195 }
602adf40 6196
602adf40 6197 /* pick the pool */
30ba1f02 6198 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6199 if (rc < 0) {
6200 if (rc == -ENOENT)
6201 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6202 goto err_out_client;
1fe48023 6203 }
c0cd10db 6204 spec->pool_id = (u64)rc;
859c31df 6205
d147543d 6206 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6207 if (!rbd_dev) {
6208 rc = -ENOMEM;
bd4ba655 6209 goto err_out_client;
b51c83c2 6210 }
c53d5893
AE
6211 rbdc = NULL; /* rbd_dev now owns this */
6212 spec = NULL; /* rbd_dev now owns this */
d147543d 6213 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6214
0d6d1e9c
MC
6215 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6216 if (!rbd_dev->config_info) {
6217 rc = -ENOMEM;
6218 goto err_out_rbd_dev;
6219 }
6220
811c6688 6221 down_write(&rbd_dev->header_rwsem);
6d69bb53 6222 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
6223 if (rc < 0) {
6224 up_write(&rbd_dev->header_rwsem);
c53d5893 6225 goto err_out_rbd_dev;
0d6d1e9c 6226 }
05fd6f6f 6227
7ce4eef7
AE
6228 /* If we are mapping a snapshot it must be marked read-only */
6229
d147543d 6230 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6231 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6232 read_only = true;
6233 rbd_dev->mapping.read_only = read_only;
6234
b536f69a 6235 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 6236 if (rc) {
e37180c0 6237 /*
99d16943 6238 * rbd_unregister_watch() can't be moved into
e37180c0
ID
6239 * rbd_dev_image_release() without refactoring, see
6240 * commit 1f3ef78861ac.
6241 */
99d16943 6242 rbd_unregister_watch(rbd_dev);
3abef3b3 6243 rbd_dev_image_release(rbd_dev);
dd5ac32d 6244 goto out;
3abef3b3
AE
6245 }
6246
dd5ac32d
ID
6247 rc = count;
6248out:
6249 module_put(THIS_MODULE);
6250 return rc;
b536f69a 6251
c53d5893
AE
6252err_out_rbd_dev:
6253 rbd_dev_destroy(rbd_dev);
bd4ba655 6254err_out_client:
9d3997fd 6255 rbd_put_client(rbdc);
0ddebc0c 6256err_out_args:
859c31df 6257 rbd_spec_put(spec);
d147543d 6258 kfree(rbd_opts);
dd5ac32d 6259 goto out;
602adf40
YS
6260}
6261
9b60e70b
ID
6262static ssize_t rbd_add(struct bus_type *bus,
6263 const char *buf,
6264 size_t count)
6265{
6266 if (single_major)
6267 return -EINVAL;
6268
6269 return do_rbd_add(bus, buf, count);
6270}
6271
6272static ssize_t rbd_add_single_major(struct bus_type *bus,
6273 const char *buf,
6274 size_t count)
6275{
6276 return do_rbd_add(bus, buf, count);
6277}
6278
dd5ac32d 6279static void rbd_dev_device_release(struct rbd_device *rbd_dev)
602adf40 6280{
602adf40 6281 rbd_free_disk(rbd_dev);
1643dfa4
ID
6282
6283 spin_lock(&rbd_dev_list_lock);
6284 list_del_init(&rbd_dev->node);
6285 spin_unlock(&rbd_dev_list_lock);
6286
200a6a8b 6287 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
dd5ac32d 6288 device_del(&rbd_dev->dev);
6d80b130 6289 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
6290 if (!single_major)
6291 unregister_blkdev(rbd_dev->major, rbd_dev->name);
602adf40
YS
6292}
6293
05a46afd
AE
6294static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6295{
ad945fc1 6296 while (rbd_dev->parent) {
05a46afd
AE
6297 struct rbd_device *first = rbd_dev;
6298 struct rbd_device *second = first->parent;
6299 struct rbd_device *third;
6300
6301 /*
6302 * Follow to the parent with no grandparent and
6303 * remove it.
6304 */
6305 while (second && (third = second->parent)) {
6306 first = second;
6307 second = third;
6308 }
ad945fc1 6309 rbd_assert(second);
8ad42cd0 6310 rbd_dev_image_release(second);
ad945fc1
AE
6311 first->parent = NULL;
6312 first->parent_overlap = 0;
6313
6314 rbd_assert(first->parent_spec);
05a46afd
AE
6315 rbd_spec_put(first->parent_spec);
6316 first->parent_spec = NULL;
05a46afd
AE
6317 }
6318}
6319
9b60e70b
ID
6320static ssize_t do_rbd_remove(struct bus_type *bus,
6321 const char *buf,
6322 size_t count)
602adf40
YS
6323{
6324 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6325 struct list_head *tmp;
6326 int dev_id;
0276dca6 6327 char opt_buf[6];
82a442d2 6328 bool already = false;
0276dca6 6329 bool force = false;
0d8189e1 6330 int ret;
602adf40 6331
0276dca6
MC
6332 dev_id = -1;
6333 opt_buf[0] = '\0';
6334 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6335 if (dev_id < 0) {
6336 pr_err("dev_id out of range\n");
602adf40 6337 return -EINVAL;
0276dca6
MC
6338 }
6339 if (opt_buf[0] != '\0') {
6340 if (!strcmp(opt_buf, "force")) {
6341 force = true;
6342 } else {
6343 pr_err("bad remove option at '%s'\n", opt_buf);
6344 return -EINVAL;
6345 }
6346 }
602adf40 6347
751cc0e3
AE
6348 ret = -ENOENT;
6349 spin_lock(&rbd_dev_list_lock);
6350 list_for_each(tmp, &rbd_dev_list) {
6351 rbd_dev = list_entry(tmp, struct rbd_device, node);
6352 if (rbd_dev->dev_id == dev_id) {
6353 ret = 0;
6354 break;
6355 }
42382b70 6356 }
751cc0e3
AE
6357 if (!ret) {
6358 spin_lock_irq(&rbd_dev->lock);
0276dca6 6359 if (rbd_dev->open_count && !force)
751cc0e3
AE
6360 ret = -EBUSY;
6361 else
82a442d2
AE
6362 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6363 &rbd_dev->flags);
751cc0e3
AE
6364 spin_unlock_irq(&rbd_dev->lock);
6365 }
6366 spin_unlock(&rbd_dev_list_lock);
82a442d2 6367 if (ret < 0 || already)
1ba0f1e7 6368 return ret;
751cc0e3 6369
0276dca6
MC
6370 if (force) {
6371 /*
6372 * Prevent new IO from being queued and wait for existing
6373 * IO to complete/fail.
6374 */
6375 blk_mq_freeze_queue(rbd_dev->disk->queue);
6376 blk_set_queue_dying(rbd_dev->disk->queue);
6377 }
6378
ed95b21a
ID
6379 down_write(&rbd_dev->lock_rwsem);
6380 if (__rbd_is_lock_owner(rbd_dev))
6381 rbd_unlock(rbd_dev);
6382 up_write(&rbd_dev->lock_rwsem);
99d16943 6383 rbd_unregister_watch(rbd_dev);
fca27065 6384
9875201e
JD
6385 /*
6386 * Don't free anything from rbd_dev->disk until after all
6387 * notifies are completely processed. Otherwise
6388 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6389 * in a potential use after free of rbd_dev->disk or rbd_dev.
6390 */
dd5ac32d 6391 rbd_dev_device_release(rbd_dev);
8ad42cd0 6392 rbd_dev_image_release(rbd_dev);
aafb230e 6393
1ba0f1e7 6394 return count;
602adf40
YS
6395}
6396
9b60e70b
ID
6397static ssize_t rbd_remove(struct bus_type *bus,
6398 const char *buf,
6399 size_t count)
6400{
6401 if (single_major)
6402 return -EINVAL;
6403
6404 return do_rbd_remove(bus, buf, count);
6405}
6406
6407static ssize_t rbd_remove_single_major(struct bus_type *bus,
6408 const char *buf,
6409 size_t count)
6410{
6411 return do_rbd_remove(bus, buf, count);
6412}
6413
602adf40
YS
6414/*
6415 * create control files in sysfs
dfc5606d 6416 * /sys/bus/rbd/...
602adf40
YS
6417 */
6418static int rbd_sysfs_init(void)
6419{
dfc5606d 6420 int ret;
602adf40 6421
fed4c143 6422 ret = device_register(&rbd_root_dev);
21079786 6423 if (ret < 0)
dfc5606d 6424 return ret;
602adf40 6425
fed4c143
AE
6426 ret = bus_register(&rbd_bus_type);
6427 if (ret < 0)
6428 device_unregister(&rbd_root_dev);
602adf40 6429
602adf40
YS
6430 return ret;
6431}
6432
6433static void rbd_sysfs_cleanup(void)
6434{
dfc5606d 6435 bus_unregister(&rbd_bus_type);
fed4c143 6436 device_unregister(&rbd_root_dev);
602adf40
YS
6437}
6438
1c2a9dfe
AE
6439static int rbd_slab_init(void)
6440{
6441 rbd_assert(!rbd_img_request_cache);
03d94406 6442 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6443 if (!rbd_img_request_cache)
6444 return -ENOMEM;
6445
6446 rbd_assert(!rbd_obj_request_cache);
03d94406 6447 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6448 if (!rbd_obj_request_cache)
6449 goto out_err;
6450
6451 rbd_assert(!rbd_segment_name_cache);
6452 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 6453 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 6454 if (rbd_segment_name_cache)
1c2a9dfe 6455 return 0;
78c2a44a 6456out_err:
13bf2834
JL
6457 kmem_cache_destroy(rbd_obj_request_cache);
6458 rbd_obj_request_cache = NULL;
1c2a9dfe 6459
868311b1
AE
6460 kmem_cache_destroy(rbd_img_request_cache);
6461 rbd_img_request_cache = NULL;
6462
1c2a9dfe
AE
6463 return -ENOMEM;
6464}
6465
6466static void rbd_slab_exit(void)
6467{
78c2a44a
AE
6468 rbd_assert(rbd_segment_name_cache);
6469 kmem_cache_destroy(rbd_segment_name_cache);
6470 rbd_segment_name_cache = NULL;
6471
868311b1
AE
6472 rbd_assert(rbd_obj_request_cache);
6473 kmem_cache_destroy(rbd_obj_request_cache);
6474 rbd_obj_request_cache = NULL;
6475
1c2a9dfe
AE
6476 rbd_assert(rbd_img_request_cache);
6477 kmem_cache_destroy(rbd_img_request_cache);
6478 rbd_img_request_cache = NULL;
6479}
6480
cc344fa1 6481static int __init rbd_init(void)
602adf40
YS
6482{
6483 int rc;
6484
1e32d34c
AE
6485 if (!libceph_compatible(NULL)) {
6486 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6487 return -EINVAL;
6488 }
e1b4d96d 6489
1c2a9dfe 6490 rc = rbd_slab_init();
602adf40
YS
6491 if (rc)
6492 return rc;
e1b4d96d 6493
f5ee37bd
ID
6494 /*
6495 * The number of active work items is limited by the number of
f77303bd 6496 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6497 */
6498 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6499 if (!rbd_wq) {
6500 rc = -ENOMEM;
6501 goto err_out_slab;
6502 }
6503
9b60e70b
ID
6504 if (single_major) {
6505 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6506 if (rbd_major < 0) {
6507 rc = rbd_major;
f5ee37bd 6508 goto err_out_wq;
9b60e70b
ID
6509 }
6510 }
6511
1c2a9dfe
AE
6512 rc = rbd_sysfs_init();
6513 if (rc)
9b60e70b
ID
6514 goto err_out_blkdev;
6515
6516 if (single_major)
6517 pr_info("loaded (major %d)\n", rbd_major);
6518 else
6519 pr_info("loaded\n");
1c2a9dfe 6520
e1b4d96d
ID
6521 return 0;
6522
9b60e70b
ID
6523err_out_blkdev:
6524 if (single_major)
6525 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6526err_out_wq:
6527 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6528err_out_slab:
6529 rbd_slab_exit();
1c2a9dfe 6530 return rc;
602adf40
YS
6531}
6532
cc344fa1 6533static void __exit rbd_exit(void)
602adf40 6534{
ffe312cf 6535 ida_destroy(&rbd_dev_id_ida);
602adf40 6536 rbd_sysfs_cleanup();
9b60e70b
ID
6537 if (single_major)
6538 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6539 destroy_workqueue(rbd_wq);
1c2a9dfe 6540 rbd_slab_exit();
602adf40
YS
6541}
6542
6543module_init(rbd_init);
6544module_exit(rbd_exit);
6545
d552c619 6546MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6547MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6548MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6549/* following authorship retained from original osdblk.c */
6550MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6551
90da258b 6552MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6553MODULE_LICENSE("GPL");