]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/rbd.c
rbd: RBD_V{1,2}_DATA_FORMAT macros
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
5cbf6f12
AE
123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1)
ed95b21a 125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
7e97332e 126#define RBD_FEATURE_DATA_POOL (1<<7)
ed95b21a
ID
127#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
128 RBD_FEATURE_STRIPINGV2 | \
7e97332e
ID
129 RBD_FEATURE_EXCLUSIVE_LOCK | \
130 RBD_FEATURE_DATA_POOL)
d889140c
AE
131
132/* Features supported by this (client software) implementation. */
133
770eba6e 134#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 135
81a89793
AE
136/*
137 * An RBD device name will be "rbd#", where the "rbd" comes from
138 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 139 */
602adf40
YS
140#define DEV_NAME_LEN 32
141
142/*
143 * block device image metadata (in-memory version)
144 */
145struct rbd_image_header {
f35a4dee 146 /* These six fields never change for a given rbd image */
849b4260 147 char *object_prefix;
602adf40 148 __u8 obj_order;
f35a4dee
AE
149 u64 stripe_unit;
150 u64 stripe_count;
7e97332e 151 s64 data_pool_id;
f35a4dee 152 u64 features; /* Might be changeable someday? */
602adf40 153
f84344f3
AE
154 /* The remaining fields need to be updated occasionally */
155 u64 image_size;
156 struct ceph_snap_context *snapc;
f35a4dee
AE
157 char *snap_names; /* format 1 only */
158 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
159};
160
0d7dbfce
AE
161/*
162 * An rbd image specification.
163 *
164 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
165 * identify an image. Each rbd_dev structure includes a pointer to
166 * an rbd_spec structure that encapsulates this identity.
167 *
168 * Each of the id's in an rbd_spec has an associated name. For a
169 * user-mapped image, the names are supplied and the id's associated
170 * with them are looked up. For a layered image, a parent image is
171 * defined by the tuple, and the names are looked up.
172 *
173 * An rbd_dev structure contains a parent_spec pointer which is
174 * non-null if the image it represents is a child in a layered
175 * image. This pointer will refer to the rbd_spec structure used
176 * by the parent rbd_dev for its own identity (i.e., the structure
177 * is shared between the parent and child).
178 *
179 * Since these structures are populated once, during the discovery
180 * phase of image construction, they are effectively immutable so
181 * we make no effort to synchronize access to them.
182 *
183 * Note that code herein does not assume the image name is known (it
184 * could be a null pointer).
0d7dbfce
AE
185 */
186struct rbd_spec {
187 u64 pool_id;
ecb4dc22 188 const char *pool_name;
0d7dbfce 189
ecb4dc22
AE
190 const char *image_id;
191 const char *image_name;
0d7dbfce
AE
192
193 u64 snap_id;
ecb4dc22 194 const char *snap_name;
0d7dbfce
AE
195
196 struct kref kref;
197};
198
602adf40 199/*
f0f8cef5 200 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
201 */
202struct rbd_client {
203 struct ceph_client *client;
204 struct kref kref;
205 struct list_head node;
206};
207
bf0d5f50
AE
208struct rbd_img_request;
209typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
210
211#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
212
213struct rbd_obj_request;
214typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
215
9969ebc5
AE
216enum obj_request_type {
217 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
218};
bf0d5f50 219
6d2940c8
GZ
220enum obj_operation_type {
221 OBJ_OP_WRITE,
222 OBJ_OP_READ,
90e98c52 223 OBJ_OP_DISCARD,
6d2940c8
GZ
224};
225
926f9b3f
AE
226enum obj_req_flags {
227 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 228 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
229 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
230 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
231};
232
bf0d5f50
AE
233struct rbd_obj_request {
234 const char *object_name;
235 u64 offset; /* object start byte */
236 u64 length; /* bytes from offset */
926f9b3f 237 unsigned long flags;
bf0d5f50 238
c5b5ef6c
AE
239 /*
240 * An object request associated with an image will have its
241 * img_data flag set; a standalone object request will not.
242 *
243 * A standalone object request will have which == BAD_WHICH
244 * and a null obj_request pointer.
245 *
246 * An object request initiated in support of a layered image
247 * object (to check for its existence before a write) will
248 * have which == BAD_WHICH and a non-null obj_request pointer.
249 *
250 * Finally, an object request for rbd image data will have
251 * which != BAD_WHICH, and will have a non-null img_request
252 * pointer. The value of which will be in the range
253 * 0..(img_request->obj_request_count-1).
254 */
255 union {
256 struct rbd_obj_request *obj_request; /* STAT op */
257 struct {
258 struct rbd_img_request *img_request;
259 u64 img_offset;
260 /* links for img_request->obj_requests list */
261 struct list_head links;
262 };
263 };
bf0d5f50
AE
264 u32 which; /* posn image request list */
265
266 enum obj_request_type type;
788e2df3
AE
267 union {
268 struct bio *bio_list;
269 struct {
270 struct page **pages;
271 u32 page_count;
272 };
273 };
0eefd470 274 struct page **copyup_pages;
ebda6408 275 u32 copyup_page_count;
bf0d5f50
AE
276
277 struct ceph_osd_request *osd_req;
278
279 u64 xferred; /* bytes transferred */
1b83bef2 280 int result;
bf0d5f50
AE
281
282 rbd_obj_callback_t callback;
788e2df3 283 struct completion completion;
bf0d5f50
AE
284
285 struct kref kref;
286};
287
0c425248 288enum img_req_flags {
9849e986
AE
289 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
290 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 291 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 292 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
293};
294
bf0d5f50 295struct rbd_img_request {
bf0d5f50
AE
296 struct rbd_device *rbd_dev;
297 u64 offset; /* starting image byte offset */
298 u64 length; /* byte count from offset */
0c425248 299 unsigned long flags;
bf0d5f50 300 union {
9849e986 301 u64 snap_id; /* for reads */
bf0d5f50 302 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
303 };
304 union {
305 struct request *rq; /* block request */
306 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 307 };
3d7efd18 308 struct page **copyup_pages;
ebda6408 309 u32 copyup_page_count;
bf0d5f50
AE
310 spinlock_t completion_lock;/* protects next_completion */
311 u32 next_completion;
312 rbd_img_callback_t callback;
55f27e09 313 u64 xferred;/* aggregate bytes transferred */
a5a337d4 314 int result; /* first nonzero obj_request result */
bf0d5f50
AE
315
316 u32 obj_request_count;
317 struct list_head obj_requests; /* rbd_obj_request structs */
318
319 struct kref kref;
320};
321
322#define for_each_obj_request(ireq, oreq) \
ef06f4d3 323 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 324#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 325 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 326#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 327 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 328
99d16943
ID
329enum rbd_watch_state {
330 RBD_WATCH_STATE_UNREGISTERED,
331 RBD_WATCH_STATE_REGISTERED,
332 RBD_WATCH_STATE_ERROR,
333};
334
ed95b21a
ID
335enum rbd_lock_state {
336 RBD_LOCK_STATE_UNLOCKED,
337 RBD_LOCK_STATE_LOCKED,
338 RBD_LOCK_STATE_RELEASING,
339};
340
341/* WatchNotify::ClientId */
342struct rbd_client_id {
343 u64 gid;
344 u64 handle;
345};
346
f84344f3 347struct rbd_mapping {
99c1f08f 348 u64 size;
34b13184 349 u64 features;
f84344f3
AE
350 bool read_only;
351};
352
602adf40
YS
353/*
354 * a single device
355 */
356struct rbd_device {
de71a297 357 int dev_id; /* blkdev unique id */
602adf40
YS
358
359 int major; /* blkdev assigned major */
dd82fff1 360 int minor;
602adf40 361 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 362
a30b71b9 363 u32 image_format; /* Either 1 or 2 */
602adf40
YS
364 struct rbd_client *rbd_client;
365
366 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
367
b82d167b 368 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
369
370 struct rbd_image_header header;
b82d167b 371 unsigned long flags; /* possibly lock protected */
0d7dbfce 372 struct rbd_spec *spec;
d147543d 373 struct rbd_options *opts;
0d6d1e9c 374 char *config_info; /* add{,_single_major} string */
602adf40 375
c41d13a3 376 struct ceph_object_id header_oid;
922dab61 377 struct ceph_object_locator header_oloc;
971f839a 378
1643dfa4 379 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 380
99d16943
ID
381 struct mutex watch_mutex;
382 enum rbd_watch_state watch_state;
922dab61 383 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
384 u64 watch_cookie;
385 struct delayed_work watch_dwork;
59c2be1e 386
ed95b21a
ID
387 struct rw_semaphore lock_rwsem;
388 enum rbd_lock_state lock_state;
389 struct rbd_client_id owner_cid;
390 struct work_struct acquired_lock_work;
391 struct work_struct released_lock_work;
392 struct delayed_work lock_dwork;
393 struct work_struct unlock_work;
394 wait_queue_head_t lock_waitq;
395
1643dfa4 396 struct workqueue_struct *task_wq;
59c2be1e 397
86b00e0d
AE
398 struct rbd_spec *parent_spec;
399 u64 parent_overlap;
a2acd00e 400 atomic_t parent_ref;
2f82ee54 401 struct rbd_device *parent;
86b00e0d 402
7ad18afa
CH
403 /* Block layer tags. */
404 struct blk_mq_tag_set tag_set;
405
c666601a
JD
406 /* protects updating the header */
407 struct rw_semaphore header_rwsem;
f84344f3
AE
408
409 struct rbd_mapping mapping;
602adf40
YS
410
411 struct list_head node;
dfc5606d 412
dfc5606d
YS
413 /* sysfs related */
414 struct device dev;
b82d167b 415 unsigned long open_count; /* protected by lock */
dfc5606d
YS
416};
417
b82d167b 418/*
87c0fded
ID
419 * Flag bits for rbd_dev->flags:
420 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
421 * by rbd_dev->lock
422 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 423 */
6d292906
AE
424enum rbd_dev_flags {
425 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 426 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 427 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
428};
429
cfbf6377 430static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 431
602adf40 432static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
433static DEFINE_SPINLOCK(rbd_dev_list_lock);
434
432b8587
AE
435static LIST_HEAD(rbd_client_list); /* clients */
436static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 437
78c2a44a
AE
438/* Slab caches for frequently-allocated structures */
439
1c2a9dfe 440static struct kmem_cache *rbd_img_request_cache;
868311b1 441static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 442static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 443
9b60e70b 444static int rbd_major;
f8a22fc2
ID
445static DEFINE_IDA(rbd_dev_id_ida);
446
f5ee37bd
ID
447static struct workqueue_struct *rbd_wq;
448
9b60e70b
ID
449/*
450 * Default to false for now, as single-major requires >= 0.75 version of
451 * userspace rbd utility.
452 */
453static bool single_major = false;
454module_param(single_major, bool, S_IRUGO);
455MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
456
3d7efd18
AE
457static int rbd_img_request_submit(struct rbd_img_request *img_request);
458
f0f8cef5
AE
459static ssize_t rbd_add(struct bus_type *bus, const char *buf,
460 size_t count);
461static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
462 size_t count);
9b60e70b
ID
463static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
464 size_t count);
465static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
466 size_t count);
6d69bb53 467static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 468static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 469
9b60e70b
ID
470static int rbd_dev_id_to_minor(int dev_id)
471{
7e513d43 472 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
473}
474
475static int minor_to_rbd_dev_id(int minor)
476{
7e513d43 477 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
478}
479
ed95b21a
ID
480static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
481{
482 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
483 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
484 !rbd_dev->mapping.read_only;
485}
486
487static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
488{
489 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
490 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
491}
492
493static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
494{
495 bool is_lock_owner;
496
497 down_read(&rbd_dev->lock_rwsem);
498 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
499 up_read(&rbd_dev->lock_rwsem);
500 return is_lock_owner;
501}
502
b15a21dd
GKH
503static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
504static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
505static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
506static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
507
508static struct attribute *rbd_bus_attrs[] = {
509 &bus_attr_add.attr,
510 &bus_attr_remove.attr,
9b60e70b
ID
511 &bus_attr_add_single_major.attr,
512 &bus_attr_remove_single_major.attr,
b15a21dd 513 NULL,
f0f8cef5 514};
92c76dc0
ID
515
516static umode_t rbd_bus_is_visible(struct kobject *kobj,
517 struct attribute *attr, int index)
518{
9b60e70b
ID
519 if (!single_major &&
520 (attr == &bus_attr_add_single_major.attr ||
521 attr == &bus_attr_remove_single_major.attr))
522 return 0;
523
92c76dc0
ID
524 return attr->mode;
525}
526
527static const struct attribute_group rbd_bus_group = {
528 .attrs = rbd_bus_attrs,
529 .is_visible = rbd_bus_is_visible,
530};
531__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
532
533static struct bus_type rbd_bus_type = {
534 .name = "rbd",
b15a21dd 535 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
536};
537
538static void rbd_root_dev_release(struct device *dev)
539{
540}
541
542static struct device rbd_root_dev = {
543 .init_name = "rbd",
544 .release = rbd_root_dev_release,
545};
546
06ecc6cb
AE
547static __printf(2, 3)
548void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
549{
550 struct va_format vaf;
551 va_list args;
552
553 va_start(args, fmt);
554 vaf.fmt = fmt;
555 vaf.va = &args;
556
557 if (!rbd_dev)
558 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
559 else if (rbd_dev->disk)
560 printk(KERN_WARNING "%s: %s: %pV\n",
561 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
562 else if (rbd_dev->spec && rbd_dev->spec->image_name)
563 printk(KERN_WARNING "%s: image %s: %pV\n",
564 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
565 else if (rbd_dev->spec && rbd_dev->spec->image_id)
566 printk(KERN_WARNING "%s: id %s: %pV\n",
567 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
568 else /* punt */
569 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
570 RBD_DRV_NAME, rbd_dev, &vaf);
571 va_end(args);
572}
573
aafb230e
AE
574#ifdef RBD_DEBUG
575#define rbd_assert(expr) \
576 if (unlikely(!(expr))) { \
577 printk(KERN_ERR "\nAssertion failure in %s() " \
578 "at line %d:\n\n" \
579 "\trbd_assert(%s);\n\n", \
580 __func__, __LINE__, #expr); \
581 BUG(); \
582 }
583#else /* !RBD_DEBUG */
584# define rbd_assert(expr) ((void) 0)
585#endif /* !RBD_DEBUG */
dfc5606d 586
2761713d 587static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 588static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
589static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
590static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 591
cc4a38bd 592static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 593static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 594static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 595static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
596static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
597 u64 snap_id);
2ad3d716
AE
598static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
599 u8 *order, u64 *snap_size);
600static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
601 u64 *snap_features);
59c2be1e 602
602adf40
YS
603static int rbd_open(struct block_device *bdev, fmode_t mode)
604{
f0f8cef5 605 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 606 bool removing = false;
602adf40 607
f84344f3 608 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
609 return -EROFS;
610
a14ea269 611 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
612 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
613 removing = true;
614 else
615 rbd_dev->open_count++;
a14ea269 616 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
617 if (removing)
618 return -ENOENT;
619
c3e946ce 620 (void) get_device(&rbd_dev->dev);
340c7a2b 621
602adf40
YS
622 return 0;
623}
624
db2a144b 625static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
626{
627 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
628 unsigned long open_count_before;
629
a14ea269 630 spin_lock_irq(&rbd_dev->lock);
b82d167b 631 open_count_before = rbd_dev->open_count--;
a14ea269 632 spin_unlock_irq(&rbd_dev->lock);
b82d167b 633 rbd_assert(open_count_before > 0);
dfc5606d 634
c3e946ce 635 put_device(&rbd_dev->dev);
dfc5606d
YS
636}
637
131fd9f6
GZ
638static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
639{
77f33c03 640 int ret = 0;
131fd9f6
GZ
641 int val;
642 bool ro;
77f33c03 643 bool ro_changed = false;
131fd9f6 644
77f33c03 645 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
646 if (get_user(val, (int __user *)(arg)))
647 return -EFAULT;
648
649 ro = val ? true : false;
650 /* Snapshot doesn't allow to write*/
651 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
652 return -EROFS;
653
77f33c03
JD
654 spin_lock_irq(&rbd_dev->lock);
655 /* prevent others open this device */
656 if (rbd_dev->open_count > 1) {
657 ret = -EBUSY;
658 goto out;
659 }
660
131fd9f6
GZ
661 if (rbd_dev->mapping.read_only != ro) {
662 rbd_dev->mapping.read_only = ro;
77f33c03 663 ro_changed = true;
131fd9f6
GZ
664 }
665
77f33c03
JD
666out:
667 spin_unlock_irq(&rbd_dev->lock);
668 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
669 if (ret == 0 && ro_changed)
670 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
671
672 return ret;
131fd9f6
GZ
673}
674
675static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
676 unsigned int cmd, unsigned long arg)
677{
678 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
679 int ret = 0;
680
131fd9f6
GZ
681 switch (cmd) {
682 case BLKROSET:
683 ret = rbd_ioctl_set_ro(rbd_dev, arg);
684 break;
685 default:
686 ret = -ENOTTY;
687 }
688
131fd9f6
GZ
689 return ret;
690}
691
692#ifdef CONFIG_COMPAT
693static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
694 unsigned int cmd, unsigned long arg)
695{
696 return rbd_ioctl(bdev, mode, cmd, arg);
697}
698#endif /* CONFIG_COMPAT */
699
602adf40
YS
700static const struct block_device_operations rbd_bd_ops = {
701 .owner = THIS_MODULE,
702 .open = rbd_open,
dfc5606d 703 .release = rbd_release,
131fd9f6
GZ
704 .ioctl = rbd_ioctl,
705#ifdef CONFIG_COMPAT
706 .compat_ioctl = rbd_compat_ioctl,
707#endif
602adf40
YS
708};
709
710/*
7262cfca 711 * Initialize an rbd client instance. Success or not, this function
cfbf6377 712 * consumes ceph_opts. Caller holds client_mutex.
602adf40 713 */
f8c38929 714static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
715{
716 struct rbd_client *rbdc;
717 int ret = -ENOMEM;
718
37206ee5 719 dout("%s:\n", __func__);
602adf40
YS
720 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
721 if (!rbdc)
722 goto out_opt;
723
724 kref_init(&rbdc->kref);
725 INIT_LIST_HEAD(&rbdc->node);
726
43ae4701 727 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 728 if (IS_ERR(rbdc->client))
08f75463 729 goto out_rbdc;
43ae4701 730 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
731
732 ret = ceph_open_session(rbdc->client);
733 if (ret < 0)
08f75463 734 goto out_client;
602adf40 735
432b8587 736 spin_lock(&rbd_client_list_lock);
602adf40 737 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 738 spin_unlock(&rbd_client_list_lock);
602adf40 739
37206ee5 740 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 741
602adf40 742 return rbdc;
08f75463 743out_client:
602adf40 744 ceph_destroy_client(rbdc->client);
08f75463 745out_rbdc:
602adf40
YS
746 kfree(rbdc);
747out_opt:
43ae4701
AE
748 if (ceph_opts)
749 ceph_destroy_options(ceph_opts);
37206ee5
AE
750 dout("%s: error %d\n", __func__, ret);
751
28f259b7 752 return ERR_PTR(ret);
602adf40
YS
753}
754
2f82ee54
AE
755static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
756{
757 kref_get(&rbdc->kref);
758
759 return rbdc;
760}
761
602adf40 762/*
1f7ba331
AE
763 * Find a ceph client with specific addr and configuration. If
764 * found, bump its reference count.
602adf40 765 */
1f7ba331 766static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
767{
768 struct rbd_client *client_node;
1f7ba331 769 bool found = false;
602adf40 770
43ae4701 771 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
772 return NULL;
773
1f7ba331
AE
774 spin_lock(&rbd_client_list_lock);
775 list_for_each_entry(client_node, &rbd_client_list, node) {
776 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
777 __rbd_get_client(client_node);
778
1f7ba331
AE
779 found = true;
780 break;
781 }
782 }
783 spin_unlock(&rbd_client_list_lock);
784
785 return found ? client_node : NULL;
602adf40
YS
786}
787
59c2be1e 788/*
210c104c 789 * (Per device) rbd map options
59c2be1e
YS
790 */
791enum {
b5584180 792 Opt_queue_depth,
59c2be1e
YS
793 Opt_last_int,
794 /* int args above */
795 Opt_last_string,
796 /* string args above */
cc0538b6
AE
797 Opt_read_only,
798 Opt_read_write,
80de1912 799 Opt_lock_on_read,
210c104c 800 Opt_err
59c2be1e
YS
801};
802
43ae4701 803static match_table_t rbd_opts_tokens = {
b5584180 804 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
805 /* int args above */
806 /* string args above */
be466c1c 807 {Opt_read_only, "read_only"},
cc0538b6
AE
808 {Opt_read_only, "ro"}, /* Alternate spelling */
809 {Opt_read_write, "read_write"},
810 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 811 {Opt_lock_on_read, "lock_on_read"},
210c104c 812 {Opt_err, NULL}
59c2be1e
YS
813};
814
98571b5a 815struct rbd_options {
b5584180 816 int queue_depth;
98571b5a 817 bool read_only;
80de1912 818 bool lock_on_read;
98571b5a
AE
819};
820
b5584180 821#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 822#define RBD_READ_ONLY_DEFAULT false
80de1912 823#define RBD_LOCK_ON_READ_DEFAULT false
98571b5a 824
59c2be1e
YS
825static int parse_rbd_opts_token(char *c, void *private)
826{
43ae4701 827 struct rbd_options *rbd_opts = private;
59c2be1e
YS
828 substring_t argstr[MAX_OPT_ARGS];
829 int token, intval, ret;
830
43ae4701 831 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
832 if (token < Opt_last_int) {
833 ret = match_int(&argstr[0], &intval);
834 if (ret < 0) {
210c104c 835 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
836 return ret;
837 }
838 dout("got int token %d val %d\n", token, intval);
839 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 840 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
841 } else {
842 dout("got token %d\n", token);
843 }
844
845 switch (token) {
b5584180
ID
846 case Opt_queue_depth:
847 if (intval < 1) {
848 pr_err("queue_depth out of range\n");
849 return -EINVAL;
850 }
851 rbd_opts->queue_depth = intval;
852 break;
cc0538b6
AE
853 case Opt_read_only:
854 rbd_opts->read_only = true;
855 break;
856 case Opt_read_write:
857 rbd_opts->read_only = false;
858 break;
80de1912
ID
859 case Opt_lock_on_read:
860 rbd_opts->lock_on_read = true;
861 break;
59c2be1e 862 default:
210c104c
ID
863 /* libceph prints "bad option" msg */
864 return -EINVAL;
59c2be1e 865 }
210c104c 866
59c2be1e
YS
867 return 0;
868}
869
6d2940c8
GZ
870static char* obj_op_name(enum obj_operation_type op_type)
871{
872 switch (op_type) {
873 case OBJ_OP_READ:
874 return "read";
875 case OBJ_OP_WRITE:
876 return "write";
90e98c52
GZ
877 case OBJ_OP_DISCARD:
878 return "discard";
6d2940c8
GZ
879 default:
880 return "???";
881 }
882}
883
602adf40
YS
884/*
885 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
886 * not exist create it. Either way, ceph_opts is consumed by this
887 * function.
602adf40 888 */
9d3997fd 889static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 890{
f8c38929 891 struct rbd_client *rbdc;
59c2be1e 892
cfbf6377 893 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 894 rbdc = rbd_client_find(ceph_opts);
9d3997fd 895 if (rbdc) /* using an existing client */
43ae4701 896 ceph_destroy_options(ceph_opts);
9d3997fd 897 else
f8c38929 898 rbdc = rbd_client_create(ceph_opts);
cfbf6377 899 mutex_unlock(&client_mutex);
602adf40 900
9d3997fd 901 return rbdc;
602adf40
YS
902}
903
904/*
905 * Destroy ceph client
d23a4b3f 906 *
432b8587 907 * Caller must hold rbd_client_list_lock.
602adf40
YS
908 */
909static void rbd_client_release(struct kref *kref)
910{
911 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
912
37206ee5 913 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 914 spin_lock(&rbd_client_list_lock);
602adf40 915 list_del(&rbdc->node);
cd9d9f5d 916 spin_unlock(&rbd_client_list_lock);
602adf40
YS
917
918 ceph_destroy_client(rbdc->client);
919 kfree(rbdc);
920}
921
922/*
923 * Drop reference to ceph client node. If it's not referenced anymore, release
924 * it.
925 */
9d3997fd 926static void rbd_put_client(struct rbd_client *rbdc)
602adf40 927{
c53d5893
AE
928 if (rbdc)
929 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
930}
931
a30b71b9
AE
932static bool rbd_image_format_valid(u32 image_format)
933{
934 return image_format == 1 || image_format == 2;
935}
936
8e94af8e
AE
937static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
938{
103a150f
AE
939 size_t size;
940 u32 snap_count;
941
942 /* The header has to start with the magic rbd header text */
943 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
944 return false;
945
db2388b6
AE
946 /* The bio layer requires at least sector-sized I/O */
947
948 if (ondisk->options.order < SECTOR_SHIFT)
949 return false;
950
951 /* If we use u64 in a few spots we may be able to loosen this */
952
953 if (ondisk->options.order > 8 * sizeof (int) - 1)
954 return false;
955
103a150f
AE
956 /*
957 * The size of a snapshot header has to fit in a size_t, and
958 * that limits the number of snapshots.
959 */
960 snap_count = le32_to_cpu(ondisk->snap_count);
961 size = SIZE_MAX - sizeof (struct ceph_snap_context);
962 if (snap_count > size / sizeof (__le64))
963 return false;
964
965 /*
966 * Not only that, but the size of the entire the snapshot
967 * header must also be representable in a size_t.
968 */
969 size -= snap_count * sizeof (__le64);
970 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
971 return false;
972
973 return true;
8e94af8e
AE
974}
975
5bc3fb17
ID
976/*
977 * returns the size of an object in the image
978 */
979static u32 rbd_obj_bytes(struct rbd_image_header *header)
980{
981 return 1U << header->obj_order;
982}
983
263423f8
ID
984static void rbd_init_layout(struct rbd_device *rbd_dev)
985{
986 if (rbd_dev->header.stripe_unit == 0 ||
987 rbd_dev->header.stripe_count == 0) {
988 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
989 rbd_dev->header.stripe_count = 1;
990 }
991
992 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
993 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
994 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
995 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
996 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
997 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
998}
999
602adf40 1000/*
bb23e37a
AE
1001 * Fill an rbd image header with information from the given format 1
1002 * on-disk header.
602adf40 1003 */
662518b1 1004static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 1005 struct rbd_image_header_ondisk *ondisk)
602adf40 1006{
662518b1 1007 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
1008 bool first_time = header->object_prefix == NULL;
1009 struct ceph_snap_context *snapc;
1010 char *object_prefix = NULL;
1011 char *snap_names = NULL;
1012 u64 *snap_sizes = NULL;
ccece235 1013 u32 snap_count;
bb23e37a 1014 int ret = -ENOMEM;
621901d6 1015 u32 i;
602adf40 1016
bb23e37a 1017 /* Allocate this now to avoid having to handle failure below */
6a52325f 1018
bb23e37a 1019 if (first_time) {
848d796c
ID
1020 object_prefix = kstrndup(ondisk->object_prefix,
1021 sizeof(ondisk->object_prefix),
1022 GFP_KERNEL);
bb23e37a
AE
1023 if (!object_prefix)
1024 return -ENOMEM;
bb23e37a 1025 }
00f1f36f 1026
bb23e37a 1027 /* Allocate the snapshot context and fill it in */
00f1f36f 1028
bb23e37a
AE
1029 snap_count = le32_to_cpu(ondisk->snap_count);
1030 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1031 if (!snapc)
1032 goto out_err;
1033 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1034 if (snap_count) {
bb23e37a 1035 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1036 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1037
bb23e37a 1038 /* We'll keep a copy of the snapshot names... */
621901d6 1039
bb23e37a
AE
1040 if (snap_names_len > (u64)SIZE_MAX)
1041 goto out_2big;
1042 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1043 if (!snap_names)
6a52325f
AE
1044 goto out_err;
1045
bb23e37a 1046 /* ...as well as the array of their sizes. */
88a25a5f
ME
1047 snap_sizes = kmalloc_array(snap_count,
1048 sizeof(*header->snap_sizes),
1049 GFP_KERNEL);
bb23e37a 1050 if (!snap_sizes)
6a52325f 1051 goto out_err;
bb23e37a 1052
f785cc1d 1053 /*
bb23e37a
AE
1054 * Copy the names, and fill in each snapshot's id
1055 * and size.
1056 *
99a41ebc 1057 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1058 * ondisk buffer we're working with has
f785cc1d
AE
1059 * snap_names_len bytes beyond the end of the
1060 * snapshot id array, this memcpy() is safe.
1061 */
bb23e37a
AE
1062 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1063 snaps = ondisk->snaps;
1064 for (i = 0; i < snap_count; i++) {
1065 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1066 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1067 }
602adf40 1068 }
6a52325f 1069
bb23e37a 1070 /* We won't fail any more, fill in the header */
621901d6 1071
bb23e37a
AE
1072 if (first_time) {
1073 header->object_prefix = object_prefix;
1074 header->obj_order = ondisk->options.order;
263423f8 1075 rbd_init_layout(rbd_dev);
602adf40 1076 } else {
662518b1
AE
1077 ceph_put_snap_context(header->snapc);
1078 kfree(header->snap_names);
1079 kfree(header->snap_sizes);
602adf40 1080 }
849b4260 1081
bb23e37a 1082 /* The remaining fields always get updated (when we refresh) */
621901d6 1083
f84344f3 1084 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1085 header->snapc = snapc;
1086 header->snap_names = snap_names;
1087 header->snap_sizes = snap_sizes;
468521c1 1088
602adf40 1089 return 0;
bb23e37a
AE
1090out_2big:
1091 ret = -EIO;
6a52325f 1092out_err:
bb23e37a
AE
1093 kfree(snap_sizes);
1094 kfree(snap_names);
1095 ceph_put_snap_context(snapc);
1096 kfree(object_prefix);
ccece235 1097
bb23e37a 1098 return ret;
602adf40
YS
1099}
1100
9682fc6d
AE
1101static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1102{
1103 const char *snap_name;
1104
1105 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1106
1107 /* Skip over names until we find the one we are looking for */
1108
1109 snap_name = rbd_dev->header.snap_names;
1110 while (which--)
1111 snap_name += strlen(snap_name) + 1;
1112
1113 return kstrdup(snap_name, GFP_KERNEL);
1114}
1115
30d1cff8
AE
1116/*
1117 * Snapshot id comparison function for use with qsort()/bsearch().
1118 * Note that result is for snapshots in *descending* order.
1119 */
1120static int snapid_compare_reverse(const void *s1, const void *s2)
1121{
1122 u64 snap_id1 = *(u64 *)s1;
1123 u64 snap_id2 = *(u64 *)s2;
1124
1125 if (snap_id1 < snap_id2)
1126 return 1;
1127 return snap_id1 == snap_id2 ? 0 : -1;
1128}
1129
1130/*
1131 * Search a snapshot context to see if the given snapshot id is
1132 * present.
1133 *
1134 * Returns the position of the snapshot id in the array if it's found,
1135 * or BAD_SNAP_INDEX otherwise.
1136 *
1137 * Note: The snapshot array is in kept sorted (by the osd) in
1138 * reverse order, highest snapshot id first.
1139 */
9682fc6d
AE
1140static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1141{
1142 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1143 u64 *found;
9682fc6d 1144
30d1cff8
AE
1145 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1146 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1147
30d1cff8 1148 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1149}
1150
2ad3d716
AE
1151static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1152 u64 snap_id)
9e15b77d 1153{
54cac61f 1154 u32 which;
da6a6b63 1155 const char *snap_name;
9e15b77d 1156
54cac61f
AE
1157 which = rbd_dev_snap_index(rbd_dev, snap_id);
1158 if (which == BAD_SNAP_INDEX)
da6a6b63 1159 return ERR_PTR(-ENOENT);
54cac61f 1160
da6a6b63
JD
1161 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1162 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1163}
1164
1165static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1166{
9e15b77d
AE
1167 if (snap_id == CEPH_NOSNAP)
1168 return RBD_SNAP_HEAD_NAME;
1169
54cac61f
AE
1170 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1171 if (rbd_dev->image_format == 1)
1172 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1173
54cac61f 1174 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1175}
1176
2ad3d716
AE
1177static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1178 u64 *snap_size)
602adf40 1179{
2ad3d716
AE
1180 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181 if (snap_id == CEPH_NOSNAP) {
1182 *snap_size = rbd_dev->header.image_size;
1183 } else if (rbd_dev->image_format == 1) {
1184 u32 which;
602adf40 1185
2ad3d716
AE
1186 which = rbd_dev_snap_index(rbd_dev, snap_id);
1187 if (which == BAD_SNAP_INDEX)
1188 return -ENOENT;
e86924a8 1189
2ad3d716
AE
1190 *snap_size = rbd_dev->header.snap_sizes[which];
1191 } else {
1192 u64 size = 0;
1193 int ret;
1194
1195 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1196 if (ret)
1197 return ret;
1198
1199 *snap_size = size;
1200 }
1201 return 0;
602adf40
YS
1202}
1203
2ad3d716
AE
1204static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1205 u64 *snap_features)
602adf40 1206{
2ad3d716
AE
1207 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1208 if (snap_id == CEPH_NOSNAP) {
1209 *snap_features = rbd_dev->header.features;
1210 } else if (rbd_dev->image_format == 1) {
1211 *snap_features = 0; /* No features for format 1 */
602adf40 1212 } else {
2ad3d716
AE
1213 u64 features = 0;
1214 int ret;
8b0241f8 1215
2ad3d716
AE
1216 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1217 if (ret)
1218 return ret;
1219
1220 *snap_features = features;
1221 }
1222 return 0;
1223}
1224
1225static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1226{
8f4b7d98 1227 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1228 u64 size = 0;
1229 u64 features = 0;
1230 int ret;
1231
2ad3d716
AE
1232 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1233 if (ret)
1234 return ret;
1235 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1236 if (ret)
1237 return ret;
1238
1239 rbd_dev->mapping.size = size;
1240 rbd_dev->mapping.features = features;
1241
8b0241f8 1242 return 0;
602adf40
YS
1243}
1244
d1cf5788
AE
1245static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1246{
1247 rbd_dev->mapping.size = 0;
1248 rbd_dev->mapping.features = 0;
200a6a8b
AE
1249}
1250
7d5079aa
HS
1251static void rbd_segment_name_free(const char *name)
1252{
1253 /* The explicit cast here is needed to drop the const qualifier */
1254
1255 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1256}
1257
98571b5a 1258static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1259{
223768d0
ID
1260 const char *name_format = rbd_dev->image_format == 1 ?
1261 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
65ccfe21
AE
1262 char *name;
1263 u64 segment;
1264 int ret;
602adf40 1265
78c2a44a 1266 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1267 if (!name)
1268 return NULL;
1269 segment = offset >> rbd_dev->header.obj_order;
2d0ebc5d 1270 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1271 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1272 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1273 pr_err("error formatting segment name for #%llu (%d)\n",
1274 segment, ret);
7d5079aa 1275 rbd_segment_name_free(name);
65ccfe21
AE
1276 name = NULL;
1277 }
602adf40 1278
65ccfe21
AE
1279 return name;
1280}
602adf40 1281
65ccfe21
AE
1282static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1283{
5bc3fb17 1284 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
602adf40 1285
65ccfe21
AE
1286 return offset & (segment_size - 1);
1287}
1288
1289static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1290 u64 offset, u64 length)
1291{
5bc3fb17 1292 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
65ccfe21
AE
1293
1294 offset &= segment_size - 1;
1295
aafb230e 1296 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1297 if (offset + length > segment_size)
1298 length = segment_size - offset;
1299
1300 return length;
602adf40
YS
1301}
1302
1303/*
1304 * bio helpers
1305 */
1306
1307static void bio_chain_put(struct bio *chain)
1308{
1309 struct bio *tmp;
1310
1311 while (chain) {
1312 tmp = chain;
1313 chain = chain->bi_next;
1314 bio_put(tmp);
1315 }
1316}
1317
1318/*
1319 * zeros a bio chain, starting at specific offset
1320 */
1321static void zero_bio_chain(struct bio *chain, int start_ofs)
1322{
7988613b
KO
1323 struct bio_vec bv;
1324 struct bvec_iter iter;
602adf40
YS
1325 unsigned long flags;
1326 void *buf;
602adf40
YS
1327 int pos = 0;
1328
1329 while (chain) {
7988613b
KO
1330 bio_for_each_segment(bv, chain, iter) {
1331 if (pos + bv.bv_len > start_ofs) {
602adf40 1332 int remainder = max(start_ofs - pos, 0);
7988613b 1333 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1334 memset(buf + remainder, 0,
7988613b
KO
1335 bv.bv_len - remainder);
1336 flush_dcache_page(bv.bv_page);
85b5aaa6 1337 bvec_kunmap_irq(buf, &flags);
602adf40 1338 }
7988613b 1339 pos += bv.bv_len;
602adf40
YS
1340 }
1341
1342 chain = chain->bi_next;
1343 }
1344}
1345
b9434c5b
AE
1346/*
1347 * similar to zero_bio_chain(), zeros data defined by a page array,
1348 * starting at the given byte offset from the start of the array and
1349 * continuing up to the given end offset. The pages array is
1350 * assumed to be big enough to hold all bytes up to the end.
1351 */
1352static void zero_pages(struct page **pages, u64 offset, u64 end)
1353{
1354 struct page **page = &pages[offset >> PAGE_SHIFT];
1355
1356 rbd_assert(end > offset);
1357 rbd_assert(end - offset <= (u64)SIZE_MAX);
1358 while (offset < end) {
1359 size_t page_offset;
1360 size_t length;
1361 unsigned long flags;
1362 void *kaddr;
1363
491205a8
GU
1364 page_offset = offset & ~PAGE_MASK;
1365 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1366 local_irq_save(flags);
1367 kaddr = kmap_atomic(*page);
1368 memset(kaddr + page_offset, 0, length);
e2156054 1369 flush_dcache_page(*page);
b9434c5b
AE
1370 kunmap_atomic(kaddr);
1371 local_irq_restore(flags);
1372
1373 offset += length;
1374 page++;
1375 }
1376}
1377
602adf40 1378/*
f7760dad
AE
1379 * Clone a portion of a bio, starting at the given byte offset
1380 * and continuing for the number of bytes indicated.
602adf40 1381 */
f7760dad
AE
1382static struct bio *bio_clone_range(struct bio *bio_src,
1383 unsigned int offset,
1384 unsigned int len,
1385 gfp_t gfpmask)
602adf40 1386{
f7760dad
AE
1387 struct bio *bio;
1388
5341a627 1389 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1390 if (!bio)
1391 return NULL; /* ENOMEM */
602adf40 1392
5341a627 1393 bio_advance(bio, offset);
4f024f37 1394 bio->bi_iter.bi_size = len;
f7760dad
AE
1395
1396 return bio;
1397}
1398
1399/*
1400 * Clone a portion of a bio chain, starting at the given byte offset
1401 * into the first bio in the source chain and continuing for the
1402 * number of bytes indicated. The result is another bio chain of
1403 * exactly the given length, or a null pointer on error.
1404 *
1405 * The bio_src and offset parameters are both in-out. On entry they
1406 * refer to the first source bio and the offset into that bio where
1407 * the start of data to be cloned is located.
1408 *
1409 * On return, bio_src is updated to refer to the bio in the source
1410 * chain that contains first un-cloned byte, and *offset will
1411 * contain the offset of that byte within that bio.
1412 */
1413static struct bio *bio_chain_clone_range(struct bio **bio_src,
1414 unsigned int *offset,
1415 unsigned int len,
1416 gfp_t gfpmask)
1417{
1418 struct bio *bi = *bio_src;
1419 unsigned int off = *offset;
1420 struct bio *chain = NULL;
1421 struct bio **end;
1422
1423 /* Build up a chain of clone bios up to the limit */
1424
4f024f37 1425 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1426 return NULL; /* Nothing to clone */
602adf40 1427
f7760dad
AE
1428 end = &chain;
1429 while (len) {
1430 unsigned int bi_size;
1431 struct bio *bio;
1432
f5400b7a
AE
1433 if (!bi) {
1434 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1435 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1436 }
4f024f37 1437 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1438 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1439 if (!bio)
1440 goto out_err; /* ENOMEM */
1441
1442 *end = bio;
1443 end = &bio->bi_next;
602adf40 1444
f7760dad 1445 off += bi_size;
4f024f37 1446 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1447 bi = bi->bi_next;
1448 off = 0;
1449 }
1450 len -= bi_size;
1451 }
1452 *bio_src = bi;
1453 *offset = off;
1454
1455 return chain;
1456out_err:
1457 bio_chain_put(chain);
602adf40 1458
602adf40
YS
1459 return NULL;
1460}
1461
926f9b3f
AE
1462/*
1463 * The default/initial value for all object request flags is 0. For
1464 * each flag, once its value is set to 1 it is never reset to 0
1465 * again.
1466 */
57acbaa7 1467static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1468{
57acbaa7 1469 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1470 struct rbd_device *rbd_dev;
1471
57acbaa7 1472 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1473 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1474 obj_request);
1475 }
1476}
1477
57acbaa7 1478static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1479{
1480 smp_mb();
57acbaa7 1481 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1482}
1483
57acbaa7 1484static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1485{
57acbaa7
AE
1486 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1487 struct rbd_device *rbd_dev = NULL;
6365d33a 1488
57acbaa7
AE
1489 if (obj_request_img_data_test(obj_request))
1490 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1491 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1492 obj_request);
1493 }
1494}
1495
57acbaa7 1496static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1497{
1498 smp_mb();
57acbaa7 1499 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1500}
1501
5679c59f
AE
1502/*
1503 * This sets the KNOWN flag after (possibly) setting the EXISTS
1504 * flag. The latter is set based on the "exists" value provided.
1505 *
1506 * Note that for our purposes once an object exists it never goes
1507 * away again. It's possible that the response from two existence
1508 * checks are separated by the creation of the target object, and
1509 * the first ("doesn't exist") response arrives *after* the second
1510 * ("does exist"). In that case we ignore the second one.
1511 */
1512static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1513 bool exists)
1514{
1515 if (exists)
1516 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1517 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1518 smp_mb();
1519}
1520
1521static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1522{
1523 smp_mb();
1524 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1525}
1526
1527static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1528{
1529 smp_mb();
1530 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1531}
1532
9638556a
ID
1533static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1534{
1535 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1536
1537 return obj_request->img_offset <
1538 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1539}
1540
bf0d5f50
AE
1541static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1542{
37206ee5
AE
1543 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1544 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1545 kref_get(&obj_request->kref);
1546}
1547
1548static void rbd_obj_request_destroy(struct kref *kref);
1549static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1550{
1551 rbd_assert(obj_request != NULL);
37206ee5
AE
1552 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1553 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1554 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1555}
1556
0f2d5be7
AE
1557static void rbd_img_request_get(struct rbd_img_request *img_request)
1558{
1559 dout("%s: img %p (was %d)\n", __func__, img_request,
1560 atomic_read(&img_request->kref.refcount));
1561 kref_get(&img_request->kref);
1562}
1563
e93f3152
AE
1564static bool img_request_child_test(struct rbd_img_request *img_request);
1565static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1566static void rbd_img_request_destroy(struct kref *kref);
1567static void rbd_img_request_put(struct rbd_img_request *img_request)
1568{
1569 rbd_assert(img_request != NULL);
37206ee5
AE
1570 dout("%s: img %p (was %d)\n", __func__, img_request,
1571 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1572 if (img_request_child_test(img_request))
1573 kref_put(&img_request->kref, rbd_parent_request_destroy);
1574 else
1575 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1576}
1577
1578static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1579 struct rbd_obj_request *obj_request)
1580{
25dcf954
AE
1581 rbd_assert(obj_request->img_request == NULL);
1582
b155e86c 1583 /* Image request now owns object's original reference */
bf0d5f50 1584 obj_request->img_request = img_request;
25dcf954 1585 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1586 rbd_assert(!obj_request_img_data_test(obj_request));
1587 obj_request_img_data_set(obj_request);
bf0d5f50 1588 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1589 img_request->obj_request_count++;
1590 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1591 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1592 obj_request->which);
bf0d5f50
AE
1593}
1594
1595static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1596 struct rbd_obj_request *obj_request)
1597{
1598 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1599
37206ee5
AE
1600 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1601 obj_request->which);
bf0d5f50 1602 list_del(&obj_request->links);
25dcf954
AE
1603 rbd_assert(img_request->obj_request_count > 0);
1604 img_request->obj_request_count--;
1605 rbd_assert(obj_request->which == img_request->obj_request_count);
1606 obj_request->which = BAD_WHICH;
6365d33a 1607 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1608 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1609 obj_request->img_request = NULL;
25dcf954 1610 obj_request->callback = NULL;
bf0d5f50
AE
1611 rbd_obj_request_put(obj_request);
1612}
1613
1614static bool obj_request_type_valid(enum obj_request_type type)
1615{
1616 switch (type) {
9969ebc5 1617 case OBJ_REQUEST_NODATA:
bf0d5f50 1618 case OBJ_REQUEST_BIO:
788e2df3 1619 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1620 return true;
1621 default:
1622 return false;
1623 }
1624}
1625
4a17dadc
ID
1626static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1627
980917fc 1628static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1629{
980917fc
ID
1630 struct ceph_osd_request *osd_req = obj_request->osd_req;
1631
67e2b652
ID
1632 dout("%s %p \"%s\" %llu~%llu osd_req %p\n", __func__,
1633 obj_request, obj_request->object_name, obj_request->offset,
1634 obj_request->length, osd_req);
4a17dadc
ID
1635 if (obj_request_img_data_test(obj_request)) {
1636 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1637 rbd_img_request_get(obj_request->img_request);
1638 }
980917fc 1639 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1640}
1641
1642static void rbd_img_request_complete(struct rbd_img_request *img_request)
1643{
55f27e09 1644
37206ee5 1645 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1646
1647 /*
1648 * If no error occurred, compute the aggregate transfer
1649 * count for the image request. We could instead use
1650 * atomic64_cmpxchg() to update it as each object request
1651 * completes; not clear which way is better off hand.
1652 */
1653 if (!img_request->result) {
1654 struct rbd_obj_request *obj_request;
1655 u64 xferred = 0;
1656
1657 for_each_obj_request(img_request, obj_request)
1658 xferred += obj_request->xferred;
1659 img_request->xferred = xferred;
1660 }
1661
bf0d5f50
AE
1662 if (img_request->callback)
1663 img_request->callback(img_request);
1664 else
1665 rbd_img_request_put(img_request);
1666}
1667
0c425248
AE
1668/*
1669 * The default/initial value for all image request flags is 0. Each
1670 * is conditionally set to 1 at image request initialization time
1671 * and currently never change thereafter.
1672 */
1673static void img_request_write_set(struct rbd_img_request *img_request)
1674{
1675 set_bit(IMG_REQ_WRITE, &img_request->flags);
1676 smp_mb();
1677}
1678
1679static bool img_request_write_test(struct rbd_img_request *img_request)
1680{
1681 smp_mb();
1682 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1683}
1684
90e98c52
GZ
1685/*
1686 * Set the discard flag when the img_request is an discard request
1687 */
1688static void img_request_discard_set(struct rbd_img_request *img_request)
1689{
1690 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1691 smp_mb();
1692}
1693
1694static bool img_request_discard_test(struct rbd_img_request *img_request)
1695{
1696 smp_mb();
1697 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1698}
1699
9849e986
AE
1700static void img_request_child_set(struct rbd_img_request *img_request)
1701{
1702 set_bit(IMG_REQ_CHILD, &img_request->flags);
1703 smp_mb();
1704}
1705
e93f3152
AE
1706static void img_request_child_clear(struct rbd_img_request *img_request)
1707{
1708 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1709 smp_mb();
1710}
1711
9849e986
AE
1712static bool img_request_child_test(struct rbd_img_request *img_request)
1713{
1714 smp_mb();
1715 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1716}
1717
d0b2e944
AE
1718static void img_request_layered_set(struct rbd_img_request *img_request)
1719{
1720 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1721 smp_mb();
1722}
1723
a2acd00e
AE
1724static void img_request_layered_clear(struct rbd_img_request *img_request)
1725{
1726 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1727 smp_mb();
1728}
1729
d0b2e944
AE
1730static bool img_request_layered_test(struct rbd_img_request *img_request)
1731{
1732 smp_mb();
1733 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1734}
1735
3b434a2a
JD
1736static enum obj_operation_type
1737rbd_img_request_op_type(struct rbd_img_request *img_request)
1738{
1739 if (img_request_write_test(img_request))
1740 return OBJ_OP_WRITE;
1741 else if (img_request_discard_test(img_request))
1742 return OBJ_OP_DISCARD;
1743 else
1744 return OBJ_OP_READ;
1745}
1746
6e2a4505
AE
1747static void
1748rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1749{
b9434c5b
AE
1750 u64 xferred = obj_request->xferred;
1751 u64 length = obj_request->length;
1752
6e2a4505
AE
1753 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1754 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1755 xferred, length);
6e2a4505 1756 /*
17c1cc1d
JD
1757 * ENOENT means a hole in the image. We zero-fill the entire
1758 * length of the request. A short read also implies zero-fill
1759 * to the end of the request. An error requires the whole
1760 * length of the request to be reported finished with an error
1761 * to the block layer. In each case we update the xferred
1762 * count to indicate the whole request was satisfied.
6e2a4505 1763 */
b9434c5b 1764 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1765 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1766 if (obj_request->type == OBJ_REQUEST_BIO)
1767 zero_bio_chain(obj_request->bio_list, 0);
1768 else
1769 zero_pages(obj_request->pages, 0, length);
6e2a4505 1770 obj_request->result = 0;
b9434c5b
AE
1771 } else if (xferred < length && !obj_request->result) {
1772 if (obj_request->type == OBJ_REQUEST_BIO)
1773 zero_bio_chain(obj_request->bio_list, xferred);
1774 else
1775 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1776 }
17c1cc1d 1777 obj_request->xferred = length;
6e2a4505
AE
1778 obj_request_done_set(obj_request);
1779}
1780
bf0d5f50
AE
1781static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1782{
37206ee5
AE
1783 dout("%s: obj %p cb %p\n", __func__, obj_request,
1784 obj_request->callback);
bf0d5f50
AE
1785 if (obj_request->callback)
1786 obj_request->callback(obj_request);
788e2df3
AE
1787 else
1788 complete_all(&obj_request->completion);
bf0d5f50
AE
1789}
1790
0dcc685e
ID
1791static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1792{
1793 obj_request->result = err;
1794 obj_request->xferred = 0;
1795 /*
1796 * kludge - mirror rbd_obj_request_submit() to match a put in
1797 * rbd_img_obj_callback()
1798 */
1799 if (obj_request_img_data_test(obj_request)) {
1800 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1801 rbd_img_request_get(obj_request->img_request);
1802 }
1803 obj_request_done_set(obj_request);
1804 rbd_obj_request_complete(obj_request);
1805}
1806
c47f9371 1807static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1808{
57acbaa7 1809 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1810 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1811 bool layered = false;
1812
1813 if (obj_request_img_data_test(obj_request)) {
1814 img_request = obj_request->img_request;
1815 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1816 rbd_dev = img_request->rbd_dev;
57acbaa7 1817 }
8b3e1a56
AE
1818
1819 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1820 obj_request, img_request, obj_request->result,
1821 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1822 if (layered && obj_request->result == -ENOENT &&
1823 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1824 rbd_img_parent_read(obj_request);
1825 else if (img_request)
6e2a4505
AE
1826 rbd_img_obj_request_read_callback(obj_request);
1827 else
1828 obj_request_done_set(obj_request);
bf0d5f50
AE
1829}
1830
c47f9371 1831static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1832{
1b83bef2
SW
1833 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1834 obj_request->result, obj_request->length);
1835 /*
8b3e1a56
AE
1836 * There is no such thing as a successful short write. Set
1837 * it to our originally-requested length.
1b83bef2
SW
1838 */
1839 obj_request->xferred = obj_request->length;
07741308 1840 obj_request_done_set(obj_request);
bf0d5f50
AE
1841}
1842
90e98c52
GZ
1843static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1844{
1845 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1846 obj_request->result, obj_request->length);
1847 /*
1848 * There is no such thing as a successful short discard. Set
1849 * it to our originally-requested length.
1850 */
1851 obj_request->xferred = obj_request->length;
d0265de7
JD
1852 /* discarding a non-existent object is not a problem */
1853 if (obj_request->result == -ENOENT)
1854 obj_request->result = 0;
90e98c52
GZ
1855 obj_request_done_set(obj_request);
1856}
1857
fbfab539
AE
1858/*
1859 * For a simple stat call there's nothing to do. We'll do more if
1860 * this is part of a write sequence for a layered image.
1861 */
c47f9371 1862static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1863{
37206ee5 1864 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1865 obj_request_done_set(obj_request);
1866}
1867
2761713d
ID
1868static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1869{
1870 dout("%s: obj %p\n", __func__, obj_request);
1871
1872 if (obj_request_img_data_test(obj_request))
1873 rbd_osd_copyup_callback(obj_request);
1874 else
1875 obj_request_done_set(obj_request);
1876}
1877
85e084fe 1878static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1879{
1880 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1881 u16 opcode;
1882
85e084fe 1883 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1884 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1885 if (obj_request_img_data_test(obj_request)) {
1886 rbd_assert(obj_request->img_request);
1887 rbd_assert(obj_request->which != BAD_WHICH);
1888 } else {
1889 rbd_assert(obj_request->which == BAD_WHICH);
1890 }
bf0d5f50 1891
1b83bef2
SW
1892 if (osd_req->r_result < 0)
1893 obj_request->result = osd_req->r_result;
bf0d5f50 1894
c47f9371
AE
1895 /*
1896 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1897 * passed to the block layer, which just supports a 32-bit
1898 * length field.
c47f9371 1899 */
7665d85b 1900 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1901 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1902
79528734 1903 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1904 switch (opcode) {
1905 case CEPH_OSD_OP_READ:
c47f9371 1906 rbd_osd_read_callback(obj_request);
bf0d5f50 1907 break;
0ccd5926 1908 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1909 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1910 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1911 /* fall through */
bf0d5f50 1912 case CEPH_OSD_OP_WRITE:
e30b7577 1913 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1914 rbd_osd_write_callback(obj_request);
bf0d5f50 1915 break;
fbfab539 1916 case CEPH_OSD_OP_STAT:
c47f9371 1917 rbd_osd_stat_callback(obj_request);
fbfab539 1918 break;
90e98c52
GZ
1919 case CEPH_OSD_OP_DELETE:
1920 case CEPH_OSD_OP_TRUNCATE:
1921 case CEPH_OSD_OP_ZERO:
1922 rbd_osd_discard_callback(obj_request);
1923 break;
36be9a76 1924 case CEPH_OSD_OP_CALL:
2761713d
ID
1925 rbd_osd_call_callback(obj_request);
1926 break;
bf0d5f50 1927 default:
9584d508 1928 rbd_warn(NULL, "%s: unsupported op %hu",
bf0d5f50
AE
1929 obj_request->object_name, (unsigned short) opcode);
1930 break;
1931 }
1932
07741308 1933 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1934 rbd_obj_request_complete(obj_request);
1935}
1936
9d4df01f 1937static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1938{
8c042b0d 1939 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1940
7c84883a
ID
1941 rbd_assert(obj_request_img_data_test(obj_request));
1942 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1943}
1944
1945static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1946{
9d4df01f 1947 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1948
bb873b53
ID
1949 osd_req->r_mtime = CURRENT_TIME;
1950 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1951}
1952
bc81207e
ID
1953static struct ceph_osd_request *
1954__rbd_osd_req_create(struct rbd_device *rbd_dev,
1955 struct ceph_snap_context *snapc,
1956 int num_ops, unsigned int flags,
1957 struct rbd_obj_request *obj_request)
1958{
1959 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1960 struct ceph_osd_request *req;
1961
1962 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1963 if (!req)
1964 return NULL;
1965
1966 req->r_flags = flags;
1967 req->r_callback = rbd_osd_req_callback;
1968 req->r_priv = obj_request;
1969
1970 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1971 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, "%s",
1972 obj_request->object_name))
1973 goto err_req;
1974
1975 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1976 goto err_req;
1977
1978 return req;
1979
1980err_req:
1981 ceph_osdc_put_request(req);
1982 return NULL;
1983}
1984
0ccd5926
ID
1985/*
1986 * Create an osd request. A read request has one osd op (read).
1987 * A write request has either one (watch) or two (hint+write) osd ops.
1988 * (All rbd data writes are prefixed with an allocation hint op, but
1989 * technically osd watch is a write request, hence this distinction.)
1990 */
bf0d5f50
AE
1991static struct ceph_osd_request *rbd_osd_req_create(
1992 struct rbd_device *rbd_dev,
6d2940c8 1993 enum obj_operation_type op_type,
deb236b3 1994 unsigned int num_ops,
430c28c3 1995 struct rbd_obj_request *obj_request)
bf0d5f50 1996{
bf0d5f50 1997 struct ceph_snap_context *snapc = NULL;
bf0d5f50 1998
90e98c52
GZ
1999 if (obj_request_img_data_test(obj_request) &&
2000 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 2001 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
2002 if (op_type == OBJ_OP_WRITE) {
2003 rbd_assert(img_request_write_test(img_request));
2004 } else {
2005 rbd_assert(img_request_discard_test(img_request));
2006 }
6d2940c8 2007 snapc = img_request->snapc;
bf0d5f50
AE
2008 }
2009
6d2940c8 2010 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3 2011
bc81207e
ID
2012 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
2013 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
2014 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK : CEPH_OSD_FLAG_READ,
2015 obj_request);
bf0d5f50
AE
2016}
2017
0eefd470 2018/*
d3246fb0
JD
2019 * Create a copyup osd request based on the information in the object
2020 * request supplied. A copyup request has two or three osd ops, a
2021 * copyup method call, potentially a hint op, and a write or truncate
2022 * or zero op.
0eefd470
AE
2023 */
2024static struct ceph_osd_request *
2025rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2026{
2027 struct rbd_img_request *img_request;
d3246fb0 2028 int num_osd_ops = 3;
0eefd470
AE
2029
2030 rbd_assert(obj_request_img_data_test(obj_request));
2031 img_request = obj_request->img_request;
2032 rbd_assert(img_request);
d3246fb0
JD
2033 rbd_assert(img_request_write_test(img_request) ||
2034 img_request_discard_test(img_request));
0eefd470 2035
d3246fb0
JD
2036 if (img_request_discard_test(img_request))
2037 num_osd_ops = 2;
2038
bc81207e
ID
2039 return __rbd_osd_req_create(img_request->rbd_dev,
2040 img_request->snapc, num_osd_ops,
2041 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
2042 obj_request);
0eefd470
AE
2043}
2044
bf0d5f50
AE
2045static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2046{
2047 ceph_osdc_put_request(osd_req);
2048}
2049
2050/* object_name is assumed to be a non-null pointer and NUL-terminated */
2051
2052static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
bf0d5f50
AE
2053 enum obj_request_type type)
2054{
2055 struct rbd_obj_request *obj_request;
2056 size_t size;
2057 char *name;
2058
2059 rbd_assert(obj_request_type_valid(type));
2060
2061 size = strlen(object_name) + 1;
5a60e876 2062 name = kmalloc(size, GFP_NOIO);
f907ad55 2063 if (!name)
bf0d5f50
AE
2064 return NULL;
2065
5a60e876 2066 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
2067 if (!obj_request) {
2068 kfree(name);
2069 return NULL;
2070 }
2071
bf0d5f50 2072 obj_request->object_name = memcpy(name, object_name, size);
bf0d5f50
AE
2073 obj_request->which = BAD_WHICH;
2074 obj_request->type = type;
2075 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2076 init_completion(&obj_request->completion);
bf0d5f50
AE
2077 kref_init(&obj_request->kref);
2078
67e2b652 2079 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
2080 return obj_request;
2081}
2082
2083static void rbd_obj_request_destroy(struct kref *kref)
2084{
2085 struct rbd_obj_request *obj_request;
2086
2087 obj_request = container_of(kref, struct rbd_obj_request, kref);
2088
37206ee5
AE
2089 dout("%s: obj %p\n", __func__, obj_request);
2090
bf0d5f50
AE
2091 rbd_assert(obj_request->img_request == NULL);
2092 rbd_assert(obj_request->which == BAD_WHICH);
2093
2094 if (obj_request->osd_req)
2095 rbd_osd_req_destroy(obj_request->osd_req);
2096
2097 rbd_assert(obj_request_type_valid(obj_request->type));
2098 switch (obj_request->type) {
9969ebc5
AE
2099 case OBJ_REQUEST_NODATA:
2100 break; /* Nothing to do */
bf0d5f50
AE
2101 case OBJ_REQUEST_BIO:
2102 if (obj_request->bio_list)
2103 bio_chain_put(obj_request->bio_list);
2104 break;
788e2df3 2105 case OBJ_REQUEST_PAGES:
04dc923c
ID
2106 /* img_data requests don't own their page array */
2107 if (obj_request->pages &&
2108 !obj_request_img_data_test(obj_request))
788e2df3
AE
2109 ceph_release_page_vector(obj_request->pages,
2110 obj_request->page_count);
2111 break;
bf0d5f50
AE
2112 }
2113
f907ad55 2114 kfree(obj_request->object_name);
868311b1
AE
2115 obj_request->object_name = NULL;
2116 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2117}
2118
fb65d228
AE
2119/* It's OK to call this for a device with no parent */
2120
2121static void rbd_spec_put(struct rbd_spec *spec);
2122static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2123{
2124 rbd_dev_remove_parent(rbd_dev);
2125 rbd_spec_put(rbd_dev->parent_spec);
2126 rbd_dev->parent_spec = NULL;
2127 rbd_dev->parent_overlap = 0;
2128}
2129
a2acd00e
AE
2130/*
2131 * Parent image reference counting is used to determine when an
2132 * image's parent fields can be safely torn down--after there are no
2133 * more in-flight requests to the parent image. When the last
2134 * reference is dropped, cleaning them up is safe.
2135 */
2136static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2137{
2138 int counter;
2139
2140 if (!rbd_dev->parent_spec)
2141 return;
2142
2143 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2144 if (counter > 0)
2145 return;
2146
2147 /* Last reference; clean up parent data structures */
2148
2149 if (!counter)
2150 rbd_dev_unparent(rbd_dev);
2151 else
9584d508 2152 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2153}
2154
2155/*
2156 * If an image has a non-zero parent overlap, get a reference to its
2157 * parent.
2158 *
2159 * Returns true if the rbd device has a parent with a non-zero
2160 * overlap and a reference for it was successfully taken, or
2161 * false otherwise.
2162 */
2163static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2164{
ae43e9d0 2165 int counter = 0;
a2acd00e
AE
2166
2167 if (!rbd_dev->parent_spec)
2168 return false;
2169
ae43e9d0
ID
2170 down_read(&rbd_dev->header_rwsem);
2171 if (rbd_dev->parent_overlap)
2172 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2173 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2174
2175 if (counter < 0)
9584d508 2176 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2177
ae43e9d0 2178 return counter > 0;
a2acd00e
AE
2179}
2180
bf0d5f50
AE
2181/*
2182 * Caller is responsible for filling in the list of object requests
2183 * that comprises the image request, and the Linux request pointer
2184 * (if there is one).
2185 */
cc344fa1
AE
2186static struct rbd_img_request *rbd_img_request_create(
2187 struct rbd_device *rbd_dev,
bf0d5f50 2188 u64 offset, u64 length,
6d2940c8 2189 enum obj_operation_type op_type,
4e752f0a 2190 struct ceph_snap_context *snapc)
bf0d5f50
AE
2191{
2192 struct rbd_img_request *img_request;
bf0d5f50 2193
7a716aac 2194 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2195 if (!img_request)
2196 return NULL;
2197
bf0d5f50
AE
2198 img_request->rq = NULL;
2199 img_request->rbd_dev = rbd_dev;
2200 img_request->offset = offset;
2201 img_request->length = length;
0c425248 2202 img_request->flags = 0;
90e98c52
GZ
2203 if (op_type == OBJ_OP_DISCARD) {
2204 img_request_discard_set(img_request);
2205 img_request->snapc = snapc;
2206 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2207 img_request_write_set(img_request);
4e752f0a 2208 img_request->snapc = snapc;
0c425248 2209 } else {
bf0d5f50 2210 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2211 }
a2acd00e 2212 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2213 img_request_layered_set(img_request);
bf0d5f50
AE
2214 spin_lock_init(&img_request->completion_lock);
2215 img_request->next_completion = 0;
2216 img_request->callback = NULL;
a5a337d4 2217 img_request->result = 0;
bf0d5f50
AE
2218 img_request->obj_request_count = 0;
2219 INIT_LIST_HEAD(&img_request->obj_requests);
2220 kref_init(&img_request->kref);
2221
37206ee5 2222 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2223 obj_op_name(op_type), offset, length, img_request);
37206ee5 2224
bf0d5f50
AE
2225 return img_request;
2226}
2227
2228static void rbd_img_request_destroy(struct kref *kref)
2229{
2230 struct rbd_img_request *img_request;
2231 struct rbd_obj_request *obj_request;
2232 struct rbd_obj_request *next_obj_request;
2233
2234 img_request = container_of(kref, struct rbd_img_request, kref);
2235
37206ee5
AE
2236 dout("%s: img %p\n", __func__, img_request);
2237
bf0d5f50
AE
2238 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2239 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2240 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2241
a2acd00e
AE
2242 if (img_request_layered_test(img_request)) {
2243 img_request_layered_clear(img_request);
2244 rbd_dev_parent_put(img_request->rbd_dev);
2245 }
2246
bef95455
JD
2247 if (img_request_write_test(img_request) ||
2248 img_request_discard_test(img_request))
812164f8 2249 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2250
1c2a9dfe 2251 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2252}
2253
e93f3152
AE
2254static struct rbd_img_request *rbd_parent_request_create(
2255 struct rbd_obj_request *obj_request,
2256 u64 img_offset, u64 length)
2257{
2258 struct rbd_img_request *parent_request;
2259 struct rbd_device *rbd_dev;
2260
2261 rbd_assert(obj_request->img_request);
2262 rbd_dev = obj_request->img_request->rbd_dev;
2263
4e752f0a 2264 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2265 length, OBJ_OP_READ, NULL);
e93f3152
AE
2266 if (!parent_request)
2267 return NULL;
2268
2269 img_request_child_set(parent_request);
2270 rbd_obj_request_get(obj_request);
2271 parent_request->obj_request = obj_request;
2272
2273 return parent_request;
2274}
2275
2276static void rbd_parent_request_destroy(struct kref *kref)
2277{
2278 struct rbd_img_request *parent_request;
2279 struct rbd_obj_request *orig_request;
2280
2281 parent_request = container_of(kref, struct rbd_img_request, kref);
2282 orig_request = parent_request->obj_request;
2283
2284 parent_request->obj_request = NULL;
2285 rbd_obj_request_put(orig_request);
2286 img_request_child_clear(parent_request);
2287
2288 rbd_img_request_destroy(kref);
2289}
2290
1217857f
AE
2291static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2292{
6365d33a 2293 struct rbd_img_request *img_request;
1217857f
AE
2294 unsigned int xferred;
2295 int result;
8b3e1a56 2296 bool more;
1217857f 2297
6365d33a
AE
2298 rbd_assert(obj_request_img_data_test(obj_request));
2299 img_request = obj_request->img_request;
2300
1217857f
AE
2301 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2302 xferred = (unsigned int)obj_request->xferred;
2303 result = obj_request->result;
2304 if (result) {
2305 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2306 enum obj_operation_type op_type;
2307
90e98c52
GZ
2308 if (img_request_discard_test(img_request))
2309 op_type = OBJ_OP_DISCARD;
2310 else if (img_request_write_test(img_request))
2311 op_type = OBJ_OP_WRITE;
2312 else
2313 op_type = OBJ_OP_READ;
1217857f 2314
9584d508 2315 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2316 obj_op_name(op_type), obj_request->length,
2317 obj_request->img_offset, obj_request->offset);
9584d508 2318 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2319 result, xferred);
2320 if (!img_request->result)
2321 img_request->result = result;
082a75da
ID
2322 /*
2323 * Need to end I/O on the entire obj_request worth of
2324 * bytes in case of error.
2325 */
2326 xferred = obj_request->length;
1217857f
AE
2327 }
2328
8b3e1a56
AE
2329 if (img_request_child_test(img_request)) {
2330 rbd_assert(img_request->obj_request != NULL);
2331 more = obj_request->which < img_request->obj_request_count - 1;
2332 } else {
2333 rbd_assert(img_request->rq != NULL);
7ad18afa
CH
2334
2335 more = blk_update_request(img_request->rq, result, xferred);
2336 if (!more)
2337 __blk_mq_end_request(img_request->rq, result);
8b3e1a56
AE
2338 }
2339
2340 return more;
1217857f
AE
2341}
2342
2169238d
AE
2343static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2344{
2345 struct rbd_img_request *img_request;
2346 u32 which = obj_request->which;
2347 bool more = true;
2348
6365d33a 2349 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2350 img_request = obj_request->img_request;
2351
2352 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2353 rbd_assert(img_request != NULL);
2169238d
AE
2354 rbd_assert(img_request->obj_request_count > 0);
2355 rbd_assert(which != BAD_WHICH);
2356 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2357
2358 spin_lock_irq(&img_request->completion_lock);
2359 if (which != img_request->next_completion)
2360 goto out;
2361
2362 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2363 rbd_assert(more);
2364 rbd_assert(which < img_request->obj_request_count);
2365
2366 if (!obj_request_done_test(obj_request))
2367 break;
1217857f 2368 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2369 which++;
2370 }
2371
2372 rbd_assert(more ^ (which == img_request->obj_request_count));
2373 img_request->next_completion = which;
2374out:
2375 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2376 rbd_img_request_put(img_request);
2169238d
AE
2377
2378 if (!more)
2379 rbd_img_request_complete(img_request);
2380}
2381
3b434a2a
JD
2382/*
2383 * Add individual osd ops to the given ceph_osd_request and prepare
2384 * them for submission. num_ops is the current number of
2385 * osd operations already to the object request.
2386 */
2387static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2388 struct ceph_osd_request *osd_request,
2389 enum obj_operation_type op_type,
2390 unsigned int num_ops)
2391{
2392 struct rbd_img_request *img_request = obj_request->img_request;
2393 struct rbd_device *rbd_dev = img_request->rbd_dev;
2394 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2395 u64 offset = obj_request->offset;
2396 u64 length = obj_request->length;
2397 u64 img_end;
2398 u16 opcode;
2399
2400 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2401 if (!offset && length == object_size &&
2402 (!img_request_layered_test(img_request) ||
2403 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2404 opcode = CEPH_OSD_OP_DELETE;
2405 } else if ((offset + length == object_size)) {
2406 opcode = CEPH_OSD_OP_TRUNCATE;
2407 } else {
2408 down_read(&rbd_dev->header_rwsem);
2409 img_end = rbd_dev->header.image_size;
2410 up_read(&rbd_dev->header_rwsem);
2411
2412 if (obj_request->img_offset + length == img_end)
2413 opcode = CEPH_OSD_OP_TRUNCATE;
2414 else
2415 opcode = CEPH_OSD_OP_ZERO;
2416 }
2417 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2418 if (!offset && length == object_size)
2419 opcode = CEPH_OSD_OP_WRITEFULL;
2420 else
2421 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2422 osd_req_op_alloc_hint_init(osd_request, num_ops,
2423 object_size, object_size);
2424 num_ops++;
2425 } else {
2426 opcode = CEPH_OSD_OP_READ;
2427 }
2428
7e868b6e 2429 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2430 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2431 else
2432 osd_req_op_extent_init(osd_request, num_ops, opcode,
2433 offset, length, 0, 0);
2434
3b434a2a
JD
2435 if (obj_request->type == OBJ_REQUEST_BIO)
2436 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2437 obj_request->bio_list, length);
2438 else if (obj_request->type == OBJ_REQUEST_PAGES)
2439 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2440 obj_request->pages, length,
2441 offset & ~PAGE_MASK, false, false);
2442
2443 /* Discards are also writes */
2444 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2445 rbd_osd_req_format_write(obj_request);
2446 else
2447 rbd_osd_req_format_read(obj_request);
2448}
2449
f1a4739f
AE
2450/*
2451 * Split up an image request into one or more object requests, each
2452 * to a different object. The "type" parameter indicates whether
2453 * "data_desc" is the pointer to the head of a list of bio
2454 * structures, or the base of a page array. In either case this
2455 * function assumes data_desc describes memory sufficient to hold
2456 * all data described by the image request.
2457 */
2458static int rbd_img_request_fill(struct rbd_img_request *img_request,
2459 enum obj_request_type type,
2460 void *data_desc)
bf0d5f50
AE
2461{
2462 struct rbd_device *rbd_dev = img_request->rbd_dev;
2463 struct rbd_obj_request *obj_request = NULL;
2464 struct rbd_obj_request *next_obj_request;
a158073c 2465 struct bio *bio_list = NULL;
f1a4739f 2466 unsigned int bio_offset = 0;
a158073c 2467 struct page **pages = NULL;
6d2940c8 2468 enum obj_operation_type op_type;
7da22d29 2469 u64 img_offset;
bf0d5f50 2470 u64 resid;
bf0d5f50 2471
f1a4739f
AE
2472 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2473 (int)type, data_desc);
37206ee5 2474
7da22d29 2475 img_offset = img_request->offset;
bf0d5f50 2476 resid = img_request->length;
4dda41d3 2477 rbd_assert(resid > 0);
3b434a2a 2478 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2479
2480 if (type == OBJ_REQUEST_BIO) {
2481 bio_list = data_desc;
4f024f37
KO
2482 rbd_assert(img_offset ==
2483 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2484 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2485 pages = data_desc;
2486 }
2487
bf0d5f50 2488 while (resid) {
2fa12320 2489 struct ceph_osd_request *osd_req;
bf0d5f50 2490 const char *object_name;
67e2b652
ID
2491 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2492 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2493
7da22d29 2494 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2495 if (!object_name)
2496 goto out_unwind;
67e2b652 2497 obj_request = rbd_obj_request_create(object_name, type);
78c2a44a
AE
2498 /* object request has its own copy of the object name */
2499 rbd_segment_name_free(object_name);
bf0d5f50
AE
2500 if (!obj_request)
2501 goto out_unwind;
62054da6 2502
67e2b652
ID
2503 obj_request->offset = offset;
2504 obj_request->length = length;
2505
03507db6
JD
2506 /*
2507 * set obj_request->img_request before creating the
2508 * osd_request so that it gets the right snapc
2509 */
2510 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2511
f1a4739f
AE
2512 if (type == OBJ_REQUEST_BIO) {
2513 unsigned int clone_size;
2514
2515 rbd_assert(length <= (u64)UINT_MAX);
2516 clone_size = (unsigned int)length;
2517 obj_request->bio_list =
2518 bio_chain_clone_range(&bio_list,
2519 &bio_offset,
2520 clone_size,
2224d879 2521 GFP_NOIO);
f1a4739f 2522 if (!obj_request->bio_list)
62054da6 2523 goto out_unwind;
90e98c52 2524 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2525 unsigned int page_count;
2526
2527 obj_request->pages = pages;
2528 page_count = (u32)calc_pages_for(offset, length);
2529 obj_request->page_count = page_count;
2530 if ((offset + length) & ~PAGE_MASK)
2531 page_count--; /* more on last page */
2532 pages += page_count;
2533 }
bf0d5f50 2534
6d2940c8
GZ
2535 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2536 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2537 obj_request);
2fa12320 2538 if (!osd_req)
62054da6 2539 goto out_unwind;
3b434a2a 2540
2fa12320 2541 obj_request->osd_req = osd_req;
2169238d 2542 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2543 obj_request->img_offset = img_offset;
9d4df01f 2544
3b434a2a 2545 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2546
7da22d29 2547 img_offset += length;
bf0d5f50
AE
2548 resid -= length;
2549 }
2550
2551 return 0;
2552
bf0d5f50
AE
2553out_unwind:
2554 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2555 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2556
2557 return -ENOMEM;
2558}
2559
0eefd470 2560static void
2761713d 2561rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2562{
2563 struct rbd_img_request *img_request;
2564 struct rbd_device *rbd_dev;
ebda6408 2565 struct page **pages;
0eefd470
AE
2566 u32 page_count;
2567
2761713d
ID
2568 dout("%s: obj %p\n", __func__, obj_request);
2569
d3246fb0
JD
2570 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2571 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2572 rbd_assert(obj_request_img_data_test(obj_request));
2573 img_request = obj_request->img_request;
2574 rbd_assert(img_request);
2575
2576 rbd_dev = img_request->rbd_dev;
2577 rbd_assert(rbd_dev);
0eefd470 2578
ebda6408
AE
2579 pages = obj_request->copyup_pages;
2580 rbd_assert(pages != NULL);
0eefd470 2581 obj_request->copyup_pages = NULL;
ebda6408
AE
2582 page_count = obj_request->copyup_page_count;
2583 rbd_assert(page_count);
2584 obj_request->copyup_page_count = 0;
2585 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2586
2587 /*
2588 * We want the transfer count to reflect the size of the
2589 * original write request. There is no such thing as a
2590 * successful short write, so if the request was successful
2591 * we can just set it to the originally-requested length.
2592 */
2593 if (!obj_request->result)
2594 obj_request->xferred = obj_request->length;
2595
2761713d 2596 obj_request_done_set(obj_request);
0eefd470
AE
2597}
2598
3d7efd18
AE
2599static void
2600rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2601{
2602 struct rbd_obj_request *orig_request;
0eefd470 2603 struct ceph_osd_request *osd_req;
0eefd470 2604 struct rbd_device *rbd_dev;
3d7efd18 2605 struct page **pages;
d3246fb0 2606 enum obj_operation_type op_type;
ebda6408 2607 u32 page_count;
bbea1c1a 2608 int img_result;
ebda6408 2609 u64 parent_length;
3d7efd18
AE
2610
2611 rbd_assert(img_request_child_test(img_request));
2612
2613 /* First get what we need from the image request */
2614
2615 pages = img_request->copyup_pages;
2616 rbd_assert(pages != NULL);
2617 img_request->copyup_pages = NULL;
ebda6408
AE
2618 page_count = img_request->copyup_page_count;
2619 rbd_assert(page_count);
2620 img_request->copyup_page_count = 0;
3d7efd18
AE
2621
2622 orig_request = img_request->obj_request;
2623 rbd_assert(orig_request != NULL);
b91f09f1 2624 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2625 img_result = img_request->result;
ebda6408 2626 parent_length = img_request->length;
fa355112 2627 rbd_assert(img_result || parent_length == img_request->xferred);
91c6febb 2628 rbd_img_request_put(img_request);
3d7efd18 2629
91c6febb
AE
2630 rbd_assert(orig_request->img_request);
2631 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2632 rbd_assert(rbd_dev);
0eefd470 2633
bbea1c1a
AE
2634 /*
2635 * If the overlap has become 0 (most likely because the
2636 * image has been flattened) we need to free the pages
2637 * and re-submit the original write request.
2638 */
2639 if (!rbd_dev->parent_overlap) {
bbea1c1a 2640 ceph_release_page_vector(pages, page_count);
980917fc
ID
2641 rbd_obj_request_submit(orig_request);
2642 return;
bbea1c1a 2643 }
0eefd470 2644
bbea1c1a 2645 if (img_result)
0eefd470 2646 goto out_err;
0eefd470 2647
8785b1d4
AE
2648 /*
2649 * The original osd request is of no use to use any more.
0ccd5926 2650 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2651 * request. Allocate the new copyup osd request for the
2652 * original request, and release the old one.
2653 */
bbea1c1a 2654 img_result = -ENOMEM;
0eefd470
AE
2655 osd_req = rbd_osd_req_create_copyup(orig_request);
2656 if (!osd_req)
2657 goto out_err;
8785b1d4 2658 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2659 orig_request->osd_req = osd_req;
2660 orig_request->copyup_pages = pages;
ebda6408 2661 orig_request->copyup_page_count = page_count;
3d7efd18 2662
0eefd470 2663 /* Initialize the copyup op */
3d7efd18 2664
0eefd470 2665 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2666 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2667 false, false);
3d7efd18 2668
d3246fb0 2669 /* Add the other op(s) */
0eefd470 2670
d3246fb0
JD
2671 op_type = rbd_img_request_op_type(orig_request->img_request);
2672 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2673
2674 /* All set, send it off. */
2675
980917fc
ID
2676 rbd_obj_request_submit(orig_request);
2677 return;
0eefd470 2678
0eefd470 2679out_err:
fa355112 2680 ceph_release_page_vector(pages, page_count);
0dcc685e 2681 rbd_obj_request_error(orig_request, img_result);
3d7efd18
AE
2682}
2683
2684/*
2685 * Read from the parent image the range of data that covers the
2686 * entire target of the given object request. This is used for
2687 * satisfying a layered image write request when the target of an
2688 * object request from the image request does not exist.
2689 *
2690 * A page array big enough to hold the returned data is allocated
2691 * and supplied to rbd_img_request_fill() as the "data descriptor."
2692 * When the read completes, this page array will be transferred to
2693 * the original object request for the copyup operation.
2694 *
c2e82414
ID
2695 * If an error occurs, it is recorded as the result of the original
2696 * object request in rbd_img_obj_exists_callback().
3d7efd18
AE
2697 */
2698static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2699{
058aa991 2700 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2701 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2702 u64 img_offset;
2703 u64 length;
2704 struct page **pages = NULL;
2705 u32 page_count;
2706 int result;
2707
3d7efd18
AE
2708 rbd_assert(rbd_dev->parent != NULL);
2709
2710 /*
2711 * Determine the byte range covered by the object in the
2712 * child image to which the original request was to be sent.
2713 */
2714 img_offset = obj_request->img_offset - obj_request->offset;
5bc3fb17 2715 length = rbd_obj_bytes(&rbd_dev->header);
3d7efd18 2716
a9e8ba2c
AE
2717 /*
2718 * There is no defined parent data beyond the parent
2719 * overlap, so limit what we read at that boundary if
2720 * necessary.
2721 */
2722 if (img_offset + length > rbd_dev->parent_overlap) {
2723 rbd_assert(img_offset < rbd_dev->parent_overlap);
2724 length = rbd_dev->parent_overlap - img_offset;
2725 }
2726
3d7efd18
AE
2727 /*
2728 * Allocate a page array big enough to receive the data read
2729 * from the parent.
2730 */
2731 page_count = (u32)calc_pages_for(0, length);
2732 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2733 if (IS_ERR(pages)) {
2734 result = PTR_ERR(pages);
2735 pages = NULL;
2736 goto out_err;
2737 }
2738
2739 result = -ENOMEM;
e93f3152
AE
2740 parent_request = rbd_parent_request_create(obj_request,
2741 img_offset, length);
3d7efd18
AE
2742 if (!parent_request)
2743 goto out_err;
3d7efd18
AE
2744
2745 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2746 if (result)
2747 goto out_err;
058aa991 2748
3d7efd18 2749 parent_request->copyup_pages = pages;
ebda6408 2750 parent_request->copyup_page_count = page_count;
3d7efd18 2751 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2752
3d7efd18
AE
2753 result = rbd_img_request_submit(parent_request);
2754 if (!result)
2755 return 0;
2756
2757 parent_request->copyup_pages = NULL;
ebda6408 2758 parent_request->copyup_page_count = 0;
3d7efd18
AE
2759 parent_request->obj_request = NULL;
2760 rbd_obj_request_put(obj_request);
2761out_err:
2762 if (pages)
2763 ceph_release_page_vector(pages, page_count);
2764 if (parent_request)
2765 rbd_img_request_put(parent_request);
3d7efd18
AE
2766 return result;
2767}
2768
c5b5ef6c
AE
2769static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2770{
c5b5ef6c 2771 struct rbd_obj_request *orig_request;
638f5abe 2772 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2773 int result;
2774
2775 rbd_assert(!obj_request_img_data_test(obj_request));
2776
2777 /*
2778 * All we need from the object request is the original
2779 * request and the result of the STAT op. Grab those, then
2780 * we're done with the request.
2781 */
2782 orig_request = obj_request->obj_request;
2783 obj_request->obj_request = NULL;
912c317d 2784 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2785 rbd_assert(orig_request);
2786 rbd_assert(orig_request->img_request);
2787
2788 result = obj_request->result;
2789 obj_request->result = 0;
2790
2791 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2792 obj_request, orig_request, result,
2793 obj_request->xferred, obj_request->length);
2794 rbd_obj_request_put(obj_request);
2795
638f5abe
AE
2796 /*
2797 * If the overlap has become 0 (most likely because the
980917fc
ID
2798 * image has been flattened) we need to re-submit the
2799 * original request.
638f5abe
AE
2800 */
2801 rbd_dev = orig_request->img_request->rbd_dev;
2802 if (!rbd_dev->parent_overlap) {
980917fc
ID
2803 rbd_obj_request_submit(orig_request);
2804 return;
638f5abe 2805 }
c5b5ef6c
AE
2806
2807 /*
2808 * Our only purpose here is to determine whether the object
2809 * exists, and we don't want to treat the non-existence as
2810 * an error. If something else comes back, transfer the
2811 * error to the original request and complete it now.
2812 */
2813 if (!result) {
2814 obj_request_existence_set(orig_request, true);
2815 } else if (result == -ENOENT) {
2816 obj_request_existence_set(orig_request, false);
c2e82414
ID
2817 } else {
2818 goto fail_orig_request;
c5b5ef6c
AE
2819 }
2820
2821 /*
2822 * Resubmit the original request now that we have recorded
2823 * whether the target object exists.
2824 */
c2e82414
ID
2825 result = rbd_img_obj_request_submit(orig_request);
2826 if (result)
2827 goto fail_orig_request;
2828
2829 return;
2830
2831fail_orig_request:
0dcc685e 2832 rbd_obj_request_error(orig_request, result);
c5b5ef6c
AE
2833}
2834
2835static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2836{
058aa991 2837 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2838 struct rbd_obj_request *stat_request;
710214e3 2839 struct page **pages;
c5b5ef6c
AE
2840 u32 page_count;
2841 size_t size;
2842 int ret;
2843
67e2b652 2844 stat_request = rbd_obj_request_create(obj_request->object_name,
710214e3
ID
2845 OBJ_REQUEST_PAGES);
2846 if (!stat_request)
2847 return -ENOMEM;
2848
2849 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2850 stat_request);
2851 if (!stat_request->osd_req) {
2852 ret = -ENOMEM;
2853 goto fail_stat_request;
2854 }
2855
c5b5ef6c
AE
2856 /*
2857 * The response data for a STAT call consists of:
2858 * le64 length;
2859 * struct {
2860 * le32 tv_sec;
2861 * le32 tv_nsec;
2862 * } mtime;
2863 */
2864 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2865 page_count = (u32)calc_pages_for(0, size);
2866 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
710214e3
ID
2867 if (IS_ERR(pages)) {
2868 ret = PTR_ERR(pages);
2869 goto fail_stat_request;
2870 }
c5b5ef6c 2871
710214e3
ID
2872 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2873 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2874 false, false);
c5b5ef6c
AE
2875
2876 rbd_obj_request_get(obj_request);
2877 stat_request->obj_request = obj_request;
2878 stat_request->pages = pages;
2879 stat_request->page_count = page_count;
c5b5ef6c
AE
2880 stat_request->callback = rbd_img_obj_exists_callback;
2881
980917fc
ID
2882 rbd_obj_request_submit(stat_request);
2883 return 0;
c5b5ef6c 2884
710214e3
ID
2885fail_stat_request:
2886 rbd_obj_request_put(stat_request);
c5b5ef6c
AE
2887 return ret;
2888}
2889
70d045f6 2890static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2891{
058aa991
ID
2892 struct rbd_img_request *img_request = obj_request->img_request;
2893 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2894
70d045f6 2895 /* Reads */
1c220881
JD
2896 if (!img_request_write_test(img_request) &&
2897 !img_request_discard_test(img_request))
70d045f6
ID
2898 return true;
2899
2900 /* Non-layered writes */
2901 if (!img_request_layered_test(img_request))
2902 return true;
2903
b454e36d 2904 /*
70d045f6
ID
2905 * Layered writes outside of the parent overlap range don't
2906 * share any data with the parent.
b454e36d 2907 */
70d045f6
ID
2908 if (!obj_request_overlaps_parent(obj_request))
2909 return true;
b454e36d 2910
c622d226
GZ
2911 /*
2912 * Entire-object layered writes - we will overwrite whatever
2913 * parent data there is anyway.
2914 */
2915 if (!obj_request->offset &&
2916 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2917 return true;
2918
70d045f6
ID
2919 /*
2920 * If the object is known to already exist, its parent data has
2921 * already been copied.
2922 */
2923 if (obj_request_known_test(obj_request) &&
2924 obj_request_exists_test(obj_request))
2925 return true;
2926
2927 return false;
2928}
2929
2930static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2931{
058aa991
ID
2932 rbd_assert(obj_request_img_data_test(obj_request));
2933 rbd_assert(obj_request_type_valid(obj_request->type));
2934 rbd_assert(obj_request->img_request);
b454e36d 2935
70d045f6 2936 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2937 rbd_obj_request_submit(obj_request);
2938 return 0;
b454e36d
AE
2939 }
2940
2941 /*
3d7efd18
AE
2942 * It's a layered write. The target object might exist but
2943 * we may not know that yet. If we know it doesn't exist,
2944 * start by reading the data for the full target object from
2945 * the parent so we can use it for a copyup to the target.
b454e36d 2946 */
70d045f6 2947 if (obj_request_known_test(obj_request))
3d7efd18
AE
2948 return rbd_img_obj_parent_read_full(obj_request);
2949
2950 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2951
2952 return rbd_img_obj_exists_submit(obj_request);
2953}
2954
bf0d5f50
AE
2955static int rbd_img_request_submit(struct rbd_img_request *img_request)
2956{
bf0d5f50 2957 struct rbd_obj_request *obj_request;
46faeed4 2958 struct rbd_obj_request *next_obj_request;
663ae2cc 2959 int ret = 0;
bf0d5f50 2960
37206ee5 2961 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2962
663ae2cc
ID
2963 rbd_img_request_get(img_request);
2964 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 2965 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 2966 if (ret)
663ae2cc 2967 goto out_put_ireq;
bf0d5f50
AE
2968 }
2969
663ae2cc
ID
2970out_put_ireq:
2971 rbd_img_request_put(img_request);
2972 return ret;
bf0d5f50 2973}
8b3e1a56
AE
2974
2975static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2976{
2977 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2978 struct rbd_device *rbd_dev;
2979 u64 obj_end;
02c74fba
AE
2980 u64 img_xferred;
2981 int img_result;
8b3e1a56
AE
2982
2983 rbd_assert(img_request_child_test(img_request));
2984
02c74fba
AE
2985 /* First get what we need from the image request and release it */
2986
8b3e1a56 2987 obj_request = img_request->obj_request;
02c74fba
AE
2988 img_xferred = img_request->xferred;
2989 img_result = img_request->result;
2990 rbd_img_request_put(img_request);
2991
2992 /*
2993 * If the overlap has become 0 (most likely because the
2994 * image has been flattened) we need to re-submit the
2995 * original request.
2996 */
a9e8ba2c
AE
2997 rbd_assert(obj_request);
2998 rbd_assert(obj_request->img_request);
02c74fba
AE
2999 rbd_dev = obj_request->img_request->rbd_dev;
3000 if (!rbd_dev->parent_overlap) {
980917fc
ID
3001 rbd_obj_request_submit(obj_request);
3002 return;
02c74fba 3003 }
a9e8ba2c 3004
02c74fba 3005 obj_request->result = img_result;
a9e8ba2c
AE
3006 if (obj_request->result)
3007 goto out;
3008
3009 /*
3010 * We need to zero anything beyond the parent overlap
3011 * boundary. Since rbd_img_obj_request_read_callback()
3012 * will zero anything beyond the end of a short read, an
3013 * easy way to do this is to pretend the data from the
3014 * parent came up short--ending at the overlap boundary.
3015 */
3016 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3017 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
3018 if (obj_end > rbd_dev->parent_overlap) {
3019 u64 xferred = 0;
3020
3021 if (obj_request->img_offset < rbd_dev->parent_overlap)
3022 xferred = rbd_dev->parent_overlap -
3023 obj_request->img_offset;
8b3e1a56 3024
02c74fba 3025 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 3026 } else {
02c74fba 3027 obj_request->xferred = img_xferred;
a9e8ba2c
AE
3028 }
3029out:
8b3e1a56
AE
3030 rbd_img_obj_request_read_callback(obj_request);
3031 rbd_obj_request_complete(obj_request);
3032}
3033
3034static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3035{
8b3e1a56
AE
3036 struct rbd_img_request *img_request;
3037 int result;
3038
3039 rbd_assert(obj_request_img_data_test(obj_request));
3040 rbd_assert(obj_request->img_request != NULL);
3041 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3042 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3043
8b3e1a56 3044 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3045 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3046 obj_request->img_offset,
e93f3152 3047 obj_request->length);
8b3e1a56
AE
3048 result = -ENOMEM;
3049 if (!img_request)
3050 goto out_err;
3051
5b2ab72d
AE
3052 if (obj_request->type == OBJ_REQUEST_BIO)
3053 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3054 obj_request->bio_list);
3055 else
3056 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3057 obj_request->pages);
8b3e1a56
AE
3058 if (result)
3059 goto out_err;
3060
3061 img_request->callback = rbd_img_parent_read_callback;
3062 result = rbd_img_request_submit(img_request);
3063 if (result)
3064 goto out_err;
3065
3066 return;
3067out_err:
3068 if (img_request)
3069 rbd_img_request_put(img_request);
3070 obj_request->result = result;
3071 obj_request->xferred = 0;
3072 obj_request_done_set(obj_request);
3073}
bf0d5f50 3074
ed95b21a 3075static const struct rbd_client_id rbd_empty_cid;
b8d70035 3076
ed95b21a
ID
3077static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3078 const struct rbd_client_id *rhs)
3079{
3080 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3081}
3082
3083static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3084{
3085 struct rbd_client_id cid;
3086
3087 mutex_lock(&rbd_dev->watch_mutex);
3088 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3089 cid.handle = rbd_dev->watch_cookie;
3090 mutex_unlock(&rbd_dev->watch_mutex);
3091 return cid;
3092}
3093
3094/*
3095 * lock_rwsem must be held for write
3096 */
3097static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3098 const struct rbd_client_id *cid)
3099{
3100 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3101 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3102 cid->gid, cid->handle);
3103 rbd_dev->owner_cid = *cid; /* struct */
3104}
3105
3106static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3107{
3108 mutex_lock(&rbd_dev->watch_mutex);
3109 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3110 mutex_unlock(&rbd_dev->watch_mutex);
3111}
3112
3113/*
3114 * lock_rwsem must be held for write
3115 */
3116static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 3117{
922dab61 3118 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3119 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3120 char cookie[32];
e627db08 3121 int ret;
b8d70035 3122
ed95b21a 3123 WARN_ON(__rbd_is_lock_owner(rbd_dev));
52bb1f9b 3124
ed95b21a
ID
3125 format_lock_cookie(rbd_dev, cookie);
3126 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3127 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3128 RBD_LOCK_TAG, "", 0);
e627db08 3129 if (ret)
ed95b21a 3130 return ret;
b8d70035 3131
ed95b21a
ID
3132 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3133 rbd_set_owner_cid(rbd_dev, &cid);
3134 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3135 return 0;
b8d70035
AE
3136}
3137
ed95b21a
ID
3138/*
3139 * lock_rwsem must be held for write
3140 */
3141static int rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 3142{
922dab61 3143 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 3144 char cookie[32];
bb040aa0
ID
3145 int ret;
3146
ed95b21a 3147 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
bb040aa0 3148
ed95b21a 3149 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
bb040aa0 3150
ed95b21a
ID
3151 format_lock_cookie(rbd_dev, cookie);
3152 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3153 RBD_LOCK_NAME, cookie);
3154 if (ret && ret != -ENOENT) {
3155 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3156 return ret;
bb040aa0
ID
3157 }
3158
ed95b21a
ID
3159 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3160 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3161 return 0;
bb040aa0
ID
3162}
3163
ed95b21a
ID
3164static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3165 enum rbd_notify_op notify_op,
3166 struct page ***preply_pages,
3167 size_t *preply_len)
9969ebc5
AE
3168{
3169 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3170 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3171 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3172 char buf[buf_size];
3173 void *p = buf;
9969ebc5 3174
ed95b21a 3175 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 3176
ed95b21a
ID
3177 /* encode *LockPayload NotifyMessage (op + ClientId) */
3178 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3179 ceph_encode_32(&p, notify_op);
3180 ceph_encode_64(&p, cid.gid);
3181 ceph_encode_64(&p, cid.handle);
8eb87565 3182
ed95b21a
ID
3183 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3184 &rbd_dev->header_oloc, buf, buf_size,
3185 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
3186}
3187
ed95b21a
ID
3188static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3189 enum rbd_notify_op notify_op)
b30a01f2 3190{
ed95b21a
ID
3191 struct page **reply_pages;
3192 size_t reply_len;
b30a01f2 3193
ed95b21a
ID
3194 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3195 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3196}
b30a01f2 3197
ed95b21a
ID
3198static void rbd_notify_acquired_lock(struct work_struct *work)
3199{
3200 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3201 acquired_lock_work);
76756a51 3202
ed95b21a 3203 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
3204}
3205
ed95b21a 3206static void rbd_notify_released_lock(struct work_struct *work)
c525f036 3207{
ed95b21a
ID
3208 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3209 released_lock_work);
811c6688 3210
ed95b21a 3211 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
3212}
3213
ed95b21a 3214static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 3215{
ed95b21a
ID
3216 struct page **reply_pages;
3217 size_t reply_len;
3218 bool lock_owner_responded = false;
36be9a76
AE
3219 int ret;
3220
ed95b21a 3221 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 3222
ed95b21a
ID
3223 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3224 &reply_pages, &reply_len);
3225 if (ret && ret != -ETIMEDOUT) {
3226 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 3227 goto out;
ed95b21a 3228 }
36be9a76 3229
ed95b21a
ID
3230 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3231 void *p = page_address(reply_pages[0]);
3232 void *const end = p + reply_len;
3233 u32 n;
36be9a76 3234
ed95b21a
ID
3235 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3236 while (n--) {
3237 u8 struct_v;
3238 u32 len;
36be9a76 3239
ed95b21a
ID
3240 ceph_decode_need(&p, end, 8 + 8, e_inval);
3241 p += 8 + 8; /* skip gid and cookie */
04017e29 3242
ed95b21a
ID
3243 ceph_decode_32_safe(&p, end, len, e_inval);
3244 if (!len)
3245 continue;
3246
3247 if (lock_owner_responded) {
3248 rbd_warn(rbd_dev,
3249 "duplicate lock owners detected");
3250 ret = -EIO;
3251 goto out;
3252 }
3253
3254 lock_owner_responded = true;
3255 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3256 &struct_v, &len);
3257 if (ret) {
3258 rbd_warn(rbd_dev,
3259 "failed to decode ResponseMessage: %d",
3260 ret);
3261 goto e_inval;
3262 }
3263
3264 ret = ceph_decode_32(&p);
3265 }
3266 }
3267
3268 if (!lock_owner_responded) {
3269 rbd_warn(rbd_dev, "no lock owners detected");
3270 ret = -ETIMEDOUT;
3271 }
3272
3273out:
3274 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3275 return ret;
3276
3277e_inval:
3278 ret = -EINVAL;
3279 goto out;
3280}
3281
3282static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3283{
3284 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3285
3286 cancel_delayed_work(&rbd_dev->lock_dwork);
3287 if (wake_all)
3288 wake_up_all(&rbd_dev->lock_waitq);
3289 else
3290 wake_up(&rbd_dev->lock_waitq);
3291}
3292
3293static int get_lock_owner_info(struct rbd_device *rbd_dev,
3294 struct ceph_locker **lockers, u32 *num_lockers)
3295{
3296 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3297 u8 lock_type;
3298 char *lock_tag;
3299 int ret;
3300
3301 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3302
3303 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3304 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3305 &lock_type, &lock_tag, lockers, num_lockers);
3306 if (ret)
3307 return ret;
3308
3309 if (*num_lockers == 0) {
3310 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3311 goto out;
3312 }
3313
3314 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3315 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3316 lock_tag);
3317 ret = -EBUSY;
3318 goto out;
3319 }
3320
3321 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3322 rbd_warn(rbd_dev, "shared lock type detected");
3323 ret = -EBUSY;
3324 goto out;
3325 }
3326
3327 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3328 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3329 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3330 (*lockers)[0].id.cookie);
3331 ret = -EBUSY;
3332 goto out;
3333 }
3334
3335out:
3336 kfree(lock_tag);
3337 return ret;
3338}
3339
3340static int find_watcher(struct rbd_device *rbd_dev,
3341 const struct ceph_locker *locker)
3342{
3343 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3344 struct ceph_watch_item *watchers;
3345 u32 num_watchers;
3346 u64 cookie;
3347 int i;
3348 int ret;
3349
3350 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3351 &rbd_dev->header_oloc, &watchers,
3352 &num_watchers);
3353 if (ret)
3354 return ret;
3355
3356 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3357 for (i = 0; i < num_watchers; i++) {
3358 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3359 sizeof(locker->info.addr)) &&
3360 watchers[i].cookie == cookie) {
3361 struct rbd_client_id cid = {
3362 .gid = le64_to_cpu(watchers[i].name.num),
3363 .handle = cookie,
3364 };
3365
3366 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3367 rbd_dev, cid.gid, cid.handle);
3368 rbd_set_owner_cid(rbd_dev, &cid);
3369 ret = 1;
3370 goto out;
3371 }
3372 }
3373
3374 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3375 ret = 0;
3376out:
3377 kfree(watchers);
3378 return ret;
3379}
3380
3381/*
3382 * lock_rwsem must be held for write
3383 */
3384static int rbd_try_lock(struct rbd_device *rbd_dev)
3385{
3386 struct ceph_client *client = rbd_dev->rbd_client->client;
3387 struct ceph_locker *lockers;
3388 u32 num_lockers;
3389 int ret;
3390
3391 for (;;) {
3392 ret = rbd_lock(rbd_dev);
3393 if (ret != -EBUSY)
3394 return ret;
3395
3396 /* determine if the current lock holder is still alive */
3397 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3398 if (ret)
3399 return ret;
3400
3401 if (num_lockers == 0)
3402 goto again;
3403
3404 ret = find_watcher(rbd_dev, lockers);
3405 if (ret) {
3406 if (ret > 0)
3407 ret = 0; /* have to request lock */
3408 goto out;
3409 }
3410
3411 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3412 ENTITY_NAME(lockers[0].id.name));
3413
3414 ret = ceph_monc_blacklist_add(&client->monc,
3415 &lockers[0].info.addr);
3416 if (ret) {
3417 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3418 ENTITY_NAME(lockers[0].id.name), ret);
3419 goto out;
3420 }
3421
3422 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3423 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3424 lockers[0].id.cookie,
3425 &lockers[0].id.name);
3426 if (ret && ret != -ENOENT)
3427 goto out;
3428
3429again:
3430 ceph_free_lockers(lockers, num_lockers);
3431 }
3432
3433out:
3434 ceph_free_lockers(lockers, num_lockers);
3435 return ret;
3436}
3437
3438/*
3439 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3440 */
3441static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3442 int *pret)
3443{
3444 enum rbd_lock_state lock_state;
3445
3446 down_read(&rbd_dev->lock_rwsem);
3447 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3448 rbd_dev->lock_state);
3449 if (__rbd_is_lock_owner(rbd_dev)) {
3450 lock_state = rbd_dev->lock_state;
3451 up_read(&rbd_dev->lock_rwsem);
3452 return lock_state;
3453 }
3454
3455 up_read(&rbd_dev->lock_rwsem);
3456 down_write(&rbd_dev->lock_rwsem);
3457 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3458 rbd_dev->lock_state);
3459 if (!__rbd_is_lock_owner(rbd_dev)) {
3460 *pret = rbd_try_lock(rbd_dev);
3461 if (*pret)
3462 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3463 }
3464
3465 lock_state = rbd_dev->lock_state;
3466 up_write(&rbd_dev->lock_rwsem);
3467 return lock_state;
3468}
3469
3470static void rbd_acquire_lock(struct work_struct *work)
3471{
3472 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3473 struct rbd_device, lock_dwork);
3474 enum rbd_lock_state lock_state;
3475 int ret;
3476
3477 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3478again:
3479 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3480 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3481 if (lock_state == RBD_LOCK_STATE_LOCKED)
3482 wake_requests(rbd_dev, true);
3483 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3484 rbd_dev, lock_state, ret);
3485 return;
3486 }
3487
3488 ret = rbd_request_lock(rbd_dev);
3489 if (ret == -ETIMEDOUT) {
3490 goto again; /* treat this as a dead client */
3491 } else if (ret < 0) {
3492 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3493 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3494 RBD_RETRY_DELAY);
3495 } else {
3496 /*
3497 * lock owner acked, but resend if we don't see them
3498 * release the lock
3499 */
3500 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3501 rbd_dev);
3502 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3503 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3504 }
3505}
3506
3507/*
3508 * lock_rwsem must be held for write
3509 */
3510static bool rbd_release_lock(struct rbd_device *rbd_dev)
3511{
3512 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3513 rbd_dev->lock_state);
3514 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3515 return false;
3516
3517 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3518 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3519 /*
ed95b21a 3520 * Ensure that all in-flight IO is flushed.
52bb1f9b 3521 *
ed95b21a
ID
3522 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3523 * may be shared with other devices.
52bb1f9b 3524 */
ed95b21a
ID
3525 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3526 up_read(&rbd_dev->lock_rwsem);
3527
3528 down_write(&rbd_dev->lock_rwsem);
3529 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3530 rbd_dev->lock_state);
3531 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3532 return false;
3533
3534 if (!rbd_unlock(rbd_dev))
3535 /*
3536 * Give others a chance to grab the lock - we would re-acquire
3537 * almost immediately if we got new IO during ceph_osdc_sync()
3538 * otherwise. We need to ack our own notifications, so this
3539 * lock_dwork will be requeued from rbd_wait_state_locked()
3540 * after wake_requests() in rbd_handle_released_lock().
3541 */
3542 cancel_delayed_work(&rbd_dev->lock_dwork);
3543
3544 return true;
3545}
3546
3547static void rbd_release_lock_work(struct work_struct *work)
3548{
3549 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3550 unlock_work);
3551
3552 down_write(&rbd_dev->lock_rwsem);
3553 rbd_release_lock(rbd_dev);
3554 up_write(&rbd_dev->lock_rwsem);
3555}
3556
3557static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3558 void **p)
3559{
3560 struct rbd_client_id cid = { 0 };
3561
3562 if (struct_v >= 2) {
3563 cid.gid = ceph_decode_64(p);
3564 cid.handle = ceph_decode_64(p);
3565 }
3566
3567 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3568 cid.handle);
3569 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3570 down_write(&rbd_dev->lock_rwsem);
3571 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3572 /*
3573 * we already know that the remote client is
3574 * the owner
3575 */
3576 up_write(&rbd_dev->lock_rwsem);
3577 return;
3578 }
3579
3580 rbd_set_owner_cid(rbd_dev, &cid);
3581 downgrade_write(&rbd_dev->lock_rwsem);
3582 } else {
3583 down_read(&rbd_dev->lock_rwsem);
3584 }
3585
3586 if (!__rbd_is_lock_owner(rbd_dev))
3587 wake_requests(rbd_dev, false);
3588 up_read(&rbd_dev->lock_rwsem);
3589}
3590
3591static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3592 void **p)
3593{
3594 struct rbd_client_id cid = { 0 };
3595
3596 if (struct_v >= 2) {
3597 cid.gid = ceph_decode_64(p);
3598 cid.handle = ceph_decode_64(p);
3599 }
3600
3601 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3602 cid.handle);
3603 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3604 down_write(&rbd_dev->lock_rwsem);
3605 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3606 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3607 __func__, rbd_dev, cid.gid, cid.handle,
3608 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3609 up_write(&rbd_dev->lock_rwsem);
3610 return;
3611 }
3612
3613 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3614 downgrade_write(&rbd_dev->lock_rwsem);
3615 } else {
3616 down_read(&rbd_dev->lock_rwsem);
3617 }
3618
3619 if (!__rbd_is_lock_owner(rbd_dev))
3620 wake_requests(rbd_dev, false);
3621 up_read(&rbd_dev->lock_rwsem);
3622}
3623
3624static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3625 void **p)
3626{
3627 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3628 struct rbd_client_id cid = { 0 };
3629 bool need_to_send;
3630
3631 if (struct_v >= 2) {
3632 cid.gid = ceph_decode_64(p);
3633 cid.handle = ceph_decode_64(p);
3634 }
3635
3636 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3637 cid.handle);
3638 if (rbd_cid_equal(&cid, &my_cid))
3639 return false;
3640
3641 down_read(&rbd_dev->lock_rwsem);
3642 need_to_send = __rbd_is_lock_owner(rbd_dev);
3643 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3644 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3645 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3646 rbd_dev);
3647 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3648 }
3649 }
3650 up_read(&rbd_dev->lock_rwsem);
3651 return need_to_send;
3652}
3653
3654static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3655 u64 notify_id, u64 cookie, s32 *result)
3656{
3657 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3658 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3659 char buf[buf_size];
3660 int ret;
3661
3662 if (result) {
3663 void *p = buf;
3664
3665 /* encode ResponseMessage */
3666 ceph_start_encoding(&p, 1, 1,
3667 buf_size - CEPH_ENCODING_START_BLK_LEN);
3668 ceph_encode_32(&p, *result);
3669 } else {
3670 buf_size = 0;
3671 }
b8d70035 3672
922dab61
ID
3673 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3674 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3675 buf, buf_size);
52bb1f9b 3676 if (ret)
ed95b21a
ID
3677 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3678}
3679
3680static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3681 u64 cookie)
3682{
3683 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3684 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3685}
3686
3687static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3688 u64 notify_id, u64 cookie, s32 result)
3689{
3690 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3691 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3692}
3693
3694static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3695 u64 notifier_id, void *data, size_t data_len)
3696{
3697 struct rbd_device *rbd_dev = arg;
3698 void *p = data;
3699 void *const end = p + data_len;
d4c2269b 3700 u8 struct_v = 0;
ed95b21a
ID
3701 u32 len;
3702 u32 notify_op;
3703 int ret;
3704
3705 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3706 __func__, rbd_dev, cookie, notify_id, data_len);
3707 if (data_len) {
3708 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3709 &struct_v, &len);
3710 if (ret) {
3711 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3712 ret);
3713 return;
3714 }
3715
3716 notify_op = ceph_decode_32(&p);
3717 } else {
3718 /* legacy notification for header updates */
3719 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3720 len = 0;
3721 }
3722
3723 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3724 switch (notify_op) {
3725 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3726 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3727 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3728 break;
3729 case RBD_NOTIFY_OP_RELEASED_LOCK:
3730 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3731 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3732 break;
3733 case RBD_NOTIFY_OP_REQUEST_LOCK:
3734 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3735 /*
3736 * send ResponseMessage(0) back so the client
3737 * can detect a missing owner
3738 */
3739 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3740 cookie, 0);
3741 else
3742 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3743 break;
3744 case RBD_NOTIFY_OP_HEADER_UPDATE:
3745 ret = rbd_dev_refresh(rbd_dev);
3746 if (ret)
3747 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3748
3749 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3750 break;
3751 default:
3752 if (rbd_is_lock_owner(rbd_dev))
3753 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3754 cookie, -EOPNOTSUPP);
3755 else
3756 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3757 break;
3758 }
b8d70035
AE
3759}
3760
99d16943
ID
3761static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3762
922dab61 3763static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3764{
922dab61 3765 struct rbd_device *rbd_dev = arg;
bb040aa0 3766
922dab61 3767 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3768
ed95b21a
ID
3769 down_write(&rbd_dev->lock_rwsem);
3770 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3771 up_write(&rbd_dev->lock_rwsem);
3772
99d16943
ID
3773 mutex_lock(&rbd_dev->watch_mutex);
3774 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3775 __rbd_unregister_watch(rbd_dev);
3776 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3777
99d16943 3778 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3779 }
99d16943 3780 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3781}
3782
9969ebc5 3783/*
99d16943 3784 * watch_mutex must be locked
9969ebc5 3785 */
99d16943 3786static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3787{
3788 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3789 struct ceph_osd_linger_request *handle;
9969ebc5 3790
922dab61 3791 rbd_assert(!rbd_dev->watch_handle);
99d16943 3792 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3793
922dab61
ID
3794 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3795 &rbd_dev->header_oloc, rbd_watch_cb,
3796 rbd_watch_errcb, rbd_dev);
3797 if (IS_ERR(handle))
3798 return PTR_ERR(handle);
8eb87565 3799
922dab61 3800 rbd_dev->watch_handle = handle;
b30a01f2 3801 return 0;
b30a01f2
ID
3802}
3803
99d16943
ID
3804/*
3805 * watch_mutex must be locked
3806 */
3807static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3808{
922dab61
ID
3809 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3810 int ret;
b30a01f2 3811
99d16943
ID
3812 rbd_assert(rbd_dev->watch_handle);
3813 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3814
922dab61
ID
3815 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3816 if (ret)
3817 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3818
922dab61 3819 rbd_dev->watch_handle = NULL;
c525f036
ID
3820}
3821
99d16943
ID
3822static int rbd_register_watch(struct rbd_device *rbd_dev)
3823{
3824 int ret;
3825
3826 mutex_lock(&rbd_dev->watch_mutex);
3827 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3828 ret = __rbd_register_watch(rbd_dev);
3829 if (ret)
3830 goto out;
3831
3832 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3833 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3834
3835out:
3836 mutex_unlock(&rbd_dev->watch_mutex);
3837 return ret;
3838}
3839
3840static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3841{
99d16943
ID
3842 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3843
3844 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3845 cancel_work_sync(&rbd_dev->acquired_lock_work);
3846 cancel_work_sync(&rbd_dev->released_lock_work);
3847 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3848 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3849}
3850
3851static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3852{
ed95b21a 3853 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3854 cancel_tasks_sync(rbd_dev);
3855
3856 mutex_lock(&rbd_dev->watch_mutex);
3857 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3858 __rbd_unregister_watch(rbd_dev);
3859 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3860 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3861
811c6688 3862 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3863}
3864
99d16943
ID
3865static void rbd_reregister_watch(struct work_struct *work)
3866{
3867 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3868 struct rbd_device, watch_dwork);
ed95b21a 3869 bool was_lock_owner = false;
87c0fded 3870 bool need_to_wake = false;
99d16943
ID
3871 int ret;
3872
3873 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3874
ed95b21a
ID
3875 down_write(&rbd_dev->lock_rwsem);
3876 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3877 was_lock_owner = rbd_release_lock(rbd_dev);
3878
99d16943 3879 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3880 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3881 mutex_unlock(&rbd_dev->watch_mutex);
3882 goto out;
3883 }
99d16943
ID
3884
3885 ret = __rbd_register_watch(rbd_dev);
3886 if (ret) {
3887 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3888 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded
ID
3889 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3890 need_to_wake = true;
3891 } else {
99d16943
ID
3892 queue_delayed_work(rbd_dev->task_wq,
3893 &rbd_dev->watch_dwork,
3894 RBD_RETRY_DELAY);
87c0fded
ID
3895 }
3896 mutex_unlock(&rbd_dev->watch_mutex);
3897 goto out;
99d16943
ID
3898 }
3899
87c0fded 3900 need_to_wake = true;
99d16943
ID
3901 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3902 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3903 mutex_unlock(&rbd_dev->watch_mutex);
3904
3905 ret = rbd_dev_refresh(rbd_dev);
3906 if (ret)
3907 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3908
ed95b21a
ID
3909 if (was_lock_owner) {
3910 ret = rbd_try_lock(rbd_dev);
3911 if (ret)
3912 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3913 ret);
3914 }
3915
87c0fded 3916out:
ed95b21a 3917 up_write(&rbd_dev->lock_rwsem);
87c0fded
ID
3918 if (need_to_wake)
3919 wake_requests(rbd_dev, true);
99d16943
ID
3920}
3921
36be9a76 3922/*
f40eb349
AE
3923 * Synchronous osd object method call. Returns the number of bytes
3924 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3925 */
3926static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3927 struct ceph_object_id *oid,
3928 struct ceph_object_locator *oloc,
36be9a76 3929 const char *method_name,
4157976b 3930 const void *outbound,
36be9a76 3931 size_t outbound_size,
4157976b 3932 void *inbound,
e2a58ee5 3933 size_t inbound_size)
36be9a76 3934{
ecd4a68a
ID
3935 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3936 struct page *req_page = NULL;
3937 struct page *reply_page;
36be9a76
AE
3938 int ret;
3939
3940 /*
6010a451
AE
3941 * Method calls are ultimately read operations. The result
3942 * should placed into the inbound buffer provided. They
3943 * also supply outbound data--parameters for the object
3944 * method. Currently if this is present it will be a
3945 * snapshot id.
36be9a76 3946 */
ecd4a68a
ID
3947 if (outbound) {
3948 if (outbound_size > PAGE_SIZE)
3949 return -E2BIG;
36be9a76 3950
ecd4a68a
ID
3951 req_page = alloc_page(GFP_KERNEL);
3952 if (!req_page)
3953 return -ENOMEM;
04017e29 3954
ecd4a68a 3955 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3956 }
430c28c3 3957
ecd4a68a
ID
3958 reply_page = alloc_page(GFP_KERNEL);
3959 if (!reply_page) {
3960 if (req_page)
3961 __free_page(req_page);
3962 return -ENOMEM;
3963 }
57385b51 3964
ecd4a68a
ID
3965 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3966 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3967 reply_page, &inbound_size);
3968 if (!ret) {
3969 memcpy(inbound, page_address(reply_page), inbound_size);
3970 ret = inbound_size;
3971 }
36be9a76 3972
ecd4a68a
ID
3973 if (req_page)
3974 __free_page(req_page);
3975 __free_page(reply_page);
36be9a76
AE
3976 return ret;
3977}
3978
ed95b21a
ID
3979/*
3980 * lock_rwsem must be held for read
3981 */
3982static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3983{
3984 DEFINE_WAIT(wait);
3985
3986 do {
3987 /*
3988 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3989 * and cancel_delayed_work() in wake_requests().
3990 */
3991 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3992 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3993 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3994 TASK_UNINTERRUPTIBLE);
3995 up_read(&rbd_dev->lock_rwsem);
3996 schedule();
3997 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
3998 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3999 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4000
ed95b21a
ID
4001 finish_wait(&rbd_dev->lock_waitq, &wait);
4002}
4003
7ad18afa 4004static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4005{
7ad18afa
CH
4006 struct request *rq = blk_mq_rq_from_pdu(work);
4007 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4008 struct rbd_img_request *img_request;
4e752f0a 4009 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4010 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4011 u64 length = blk_rq_bytes(rq);
6d2940c8 4012 enum obj_operation_type op_type;
4e752f0a 4013 u64 mapping_size;
80de1912 4014 bool must_be_locked;
bf0d5f50
AE
4015 int result;
4016
7ad18afa
CH
4017 if (rq->cmd_type != REQ_TYPE_FS) {
4018 dout("%s: non-fs request type %d\n", __func__,
4019 (int) rq->cmd_type);
4020 result = -EIO;
4021 goto err;
4022 }
4023
c2df40df 4024 if (req_op(rq) == REQ_OP_DISCARD)
90e98c52 4025 op_type = OBJ_OP_DISCARD;
c2df40df 4026 else if (req_op(rq) == REQ_OP_WRITE)
6d2940c8
GZ
4027 op_type = OBJ_OP_WRITE;
4028 else
4029 op_type = OBJ_OP_READ;
4030
bc1ecc65 4031 /* Ignore/skip any zero-length requests */
bf0d5f50 4032
bc1ecc65
ID
4033 if (!length) {
4034 dout("%s: zero-length request\n", __func__);
4035 result = 0;
4036 goto err_rq;
4037 }
bf0d5f50 4038
6d2940c8 4039 /* Only reads are allowed to a read-only device */
bc1ecc65 4040
6d2940c8 4041 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4042 if (rbd_dev->mapping.read_only) {
4043 result = -EROFS;
4044 goto err_rq;
4dda41d3 4045 }
bc1ecc65
ID
4046 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4047 }
4dda41d3 4048
bc1ecc65
ID
4049 /*
4050 * Quit early if the mapped snapshot no longer exists. It's
4051 * still possible the snapshot will have disappeared by the
4052 * time our request arrives at the osd, but there's no sense in
4053 * sending it if we already know.
4054 */
4055 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4056 dout("request for non-existent snapshot");
4057 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4058 result = -ENXIO;
4059 goto err_rq;
4060 }
4dda41d3 4061
bc1ecc65
ID
4062 if (offset && length > U64_MAX - offset + 1) {
4063 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4064 length);
4065 result = -EINVAL;
4066 goto err_rq; /* Shouldn't happen */
4067 }
4dda41d3 4068
7ad18afa
CH
4069 blk_mq_start_request(rq);
4070
4e752f0a
JD
4071 down_read(&rbd_dev->header_rwsem);
4072 mapping_size = rbd_dev->mapping.size;
6d2940c8 4073 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4074 snapc = rbd_dev->header.snapc;
4075 ceph_get_snap_context(snapc);
ed95b21a 4076 must_be_locked = rbd_is_lock_supported(rbd_dev);
80de1912
ID
4077 } else {
4078 must_be_locked = rbd_dev->opts->lock_on_read &&
4079 rbd_is_lock_supported(rbd_dev);
4e752f0a
JD
4080 }
4081 up_read(&rbd_dev->header_rwsem);
4082
4083 if (offset + length > mapping_size) {
bc1ecc65 4084 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4085 length, mapping_size);
bc1ecc65
ID
4086 result = -EIO;
4087 goto err_rq;
4088 }
bf0d5f50 4089
ed95b21a
ID
4090 if (must_be_locked) {
4091 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
4092 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4093 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
ed95b21a 4094 rbd_wait_state_locked(rbd_dev);
87c0fded
ID
4095
4096 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4097 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4098 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4099 result = -EBLACKLISTED;
4100 goto err_unlock;
4101 }
ed95b21a
ID
4102 }
4103
6d2940c8 4104 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4105 snapc);
bc1ecc65
ID
4106 if (!img_request) {
4107 result = -ENOMEM;
ed95b21a 4108 goto err_unlock;
bc1ecc65
ID
4109 }
4110 img_request->rq = rq;
70b16db8 4111 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4112
90e98c52
GZ
4113 if (op_type == OBJ_OP_DISCARD)
4114 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4115 NULL);
4116 else
4117 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4118 rq->bio);
bc1ecc65
ID
4119 if (result)
4120 goto err_img_request;
bf0d5f50 4121
bc1ecc65
ID
4122 result = rbd_img_request_submit(img_request);
4123 if (result)
4124 goto err_img_request;
bf0d5f50 4125
ed95b21a
ID
4126 if (must_be_locked)
4127 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4128 return;
bf0d5f50 4129
bc1ecc65
ID
4130err_img_request:
4131 rbd_img_request_put(img_request);
ed95b21a
ID
4132err_unlock:
4133 if (must_be_locked)
4134 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4135err_rq:
4136 if (result)
4137 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4138 obj_op_name(op_type), length, offset, result);
e96a650a 4139 ceph_put_snap_context(snapc);
7ad18afa
CH
4140err:
4141 blk_mq_end_request(rq, result);
bc1ecc65 4142}
bf0d5f50 4143
7ad18afa
CH
4144static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4145 const struct blk_mq_queue_data *bd)
bc1ecc65 4146{
7ad18afa
CH
4147 struct request *rq = bd->rq;
4148 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4149
7ad18afa
CH
4150 queue_work(rbd_wq, work);
4151 return BLK_MQ_RQ_QUEUE_OK;
bf0d5f50
AE
4152}
4153
602adf40
YS
4154static void rbd_free_disk(struct rbd_device *rbd_dev)
4155{
4156 struct gendisk *disk = rbd_dev->disk;
4157
4158 if (!disk)
4159 return;
4160
a0cab924
AE
4161 rbd_dev->disk = NULL;
4162 if (disk->flags & GENHD_FL_UP) {
602adf40 4163 del_gendisk(disk);
a0cab924
AE
4164 if (disk->queue)
4165 blk_cleanup_queue(disk->queue);
7ad18afa 4166 blk_mq_free_tag_set(&rbd_dev->tag_set);
a0cab924 4167 }
602adf40
YS
4168 put_disk(disk);
4169}
4170
788e2df3 4171static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
4172 struct ceph_object_id *oid,
4173 struct ceph_object_locator *oloc,
4174 void *buf, int buf_len)
788e2df3
AE
4175
4176{
fe5478e0
ID
4177 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4178 struct ceph_osd_request *req;
4179 struct page **pages;
4180 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
4181 int ret;
4182
fe5478e0
ID
4183 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4184 if (!req)
4185 return -ENOMEM;
788e2df3 4186
fe5478e0
ID
4187 ceph_oid_copy(&req->r_base_oid, oid);
4188 ceph_oloc_copy(&req->r_base_oloc, oloc);
4189 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 4190
fe5478e0 4191 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 4192 if (ret)
fe5478e0 4193 goto out_req;
788e2df3 4194
fe5478e0
ID
4195 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4196 if (IS_ERR(pages)) {
4197 ret = PTR_ERR(pages);
4198 goto out_req;
4199 }
1ceae7ef 4200
fe5478e0
ID
4201 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4202 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4203 true);
4204
4205 ceph_osdc_start_request(osdc, req, false);
4206 ret = ceph_osdc_wait_request(osdc, req);
4207 if (ret >= 0)
4208 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 4209
fe5478e0
ID
4210out_req:
4211 ceph_osdc_put_request(req);
788e2df3
AE
4212 return ret;
4213}
4214
602adf40 4215/*
662518b1
AE
4216 * Read the complete header for the given rbd device. On successful
4217 * return, the rbd_dev->header field will contain up-to-date
4218 * information about the image.
602adf40 4219 */
99a41ebc 4220static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4221{
4156d998 4222 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4223 u32 snap_count = 0;
4156d998
AE
4224 u64 names_size = 0;
4225 u32 want_count;
4226 int ret;
602adf40 4227
00f1f36f 4228 /*
4156d998
AE
4229 * The complete header will include an array of its 64-bit
4230 * snapshot ids, followed by the names of those snapshots as
4231 * a contiguous block of NUL-terminated strings. Note that
4232 * the number of snapshots could change by the time we read
4233 * it in, in which case we re-read it.
00f1f36f 4234 */
4156d998
AE
4235 do {
4236 size_t size;
4237
4238 kfree(ondisk);
4239
4240 size = sizeof (*ondisk);
4241 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4242 size += names_size;
4243 ondisk = kmalloc(size, GFP_KERNEL);
4244 if (!ondisk)
662518b1 4245 return -ENOMEM;
4156d998 4246
fe5478e0
ID
4247 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4248 &rbd_dev->header_oloc, ondisk, size);
4156d998 4249 if (ret < 0)
662518b1 4250 goto out;
c0cd10db 4251 if ((size_t)ret < size) {
4156d998 4252 ret = -ENXIO;
06ecc6cb
AE
4253 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4254 size, ret);
662518b1 4255 goto out;
4156d998
AE
4256 }
4257 if (!rbd_dev_ondisk_valid(ondisk)) {
4258 ret = -ENXIO;
06ecc6cb 4259 rbd_warn(rbd_dev, "invalid header");
662518b1 4260 goto out;
81e759fb 4261 }
602adf40 4262
4156d998
AE
4263 names_size = le64_to_cpu(ondisk->snap_names_len);
4264 want_count = snap_count;
4265 snap_count = le32_to_cpu(ondisk->snap_count);
4266 } while (snap_count != want_count);
00f1f36f 4267
662518b1
AE
4268 ret = rbd_header_from_disk(rbd_dev, ondisk);
4269out:
4156d998
AE
4270 kfree(ondisk);
4271
4272 return ret;
602adf40
YS
4273}
4274
15228ede
AE
4275/*
4276 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4277 * has disappeared from the (just updated) snapshot context.
4278 */
4279static void rbd_exists_validate(struct rbd_device *rbd_dev)
4280{
4281 u64 snap_id;
4282
4283 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4284 return;
4285
4286 snap_id = rbd_dev->spec->snap_id;
4287 if (snap_id == CEPH_NOSNAP)
4288 return;
4289
4290 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4291 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4292}
4293
9875201e
JD
4294static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4295{
4296 sector_t size;
9875201e
JD
4297
4298 /*
811c6688
ID
4299 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4300 * try to update its size. If REMOVING is set, updating size
4301 * is just useless work since the device can't be opened.
9875201e 4302 */
811c6688
ID
4303 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4304 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4305 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4306 dout("setting size to %llu sectors", (unsigned long long)size);
4307 set_capacity(rbd_dev->disk, size);
4308 revalidate_disk(rbd_dev->disk);
4309 }
4310}
4311
cc4a38bd 4312static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4313{
e627db08 4314 u64 mapping_size;
1fe5e993
AE
4315 int ret;
4316
cfbf6377 4317 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4318 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4319
4320 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4321 if (ret)
73e39e4d 4322 goto out;
15228ede 4323
e8f59b59
ID
4324 /*
4325 * If there is a parent, see if it has disappeared due to the
4326 * mapped image getting flattened.
4327 */
4328 if (rbd_dev->parent) {
4329 ret = rbd_dev_v2_parent_info(rbd_dev);
4330 if (ret)
73e39e4d 4331 goto out;
e8f59b59
ID
4332 }
4333
5ff1108c 4334 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4335 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4336 } else {
4337 /* validate mapped snapshot's EXISTS flag */
4338 rbd_exists_validate(rbd_dev);
4339 }
15228ede 4340
73e39e4d 4341out:
cfbf6377 4342 up_write(&rbd_dev->header_rwsem);
73e39e4d 4343 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4344 rbd_dev_update_size(rbd_dev);
1fe5e993 4345
73e39e4d 4346 return ret;
1fe5e993
AE
4347}
4348
7ad18afa
CH
4349static int rbd_init_request(void *data, struct request *rq,
4350 unsigned int hctx_idx, unsigned int request_idx,
4351 unsigned int numa_node)
4352{
4353 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4354
4355 INIT_WORK(work, rbd_queue_workfn);
4356 return 0;
4357}
4358
4359static struct blk_mq_ops rbd_mq_ops = {
4360 .queue_rq = rbd_queue_rq,
7ad18afa
CH
4361 .init_request = rbd_init_request,
4362};
4363
602adf40
YS
4364static int rbd_init_disk(struct rbd_device *rbd_dev)
4365{
4366 struct gendisk *disk;
4367 struct request_queue *q;
593a9e7b 4368 u64 segment_size;
7ad18afa 4369 int err;
602adf40 4370
602adf40 4371 /* create gendisk info */
7e513d43
ID
4372 disk = alloc_disk(single_major ?
4373 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4374 RBD_MINORS_PER_MAJOR);
602adf40 4375 if (!disk)
1fcdb8aa 4376 return -ENOMEM;
602adf40 4377
f0f8cef5 4378 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4379 rbd_dev->dev_id);
602adf40 4380 disk->major = rbd_dev->major;
dd82fff1 4381 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4382 if (single_major)
4383 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4384 disk->fops = &rbd_bd_ops;
4385 disk->private_data = rbd_dev;
4386
7ad18afa
CH
4387 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4388 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4389 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4390 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4391 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4392 rbd_dev->tag_set.nr_hw_queues = 1;
4393 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4394
4395 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4396 if (err)
602adf40 4397 goto out_disk;
029bcbd8 4398
7ad18afa
CH
4399 q = blk_mq_init_queue(&rbd_dev->tag_set);
4400 if (IS_ERR(q)) {
4401 err = PTR_ERR(q);
4402 goto out_tag_set;
4403 }
4404
d8a2c89c
ID
4405 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4406 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4407
029bcbd8 4408 /* set io sizes to object size */
593a9e7b
AE
4409 segment_size = rbd_obj_bytes(&rbd_dev->header);
4410 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4411 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4412 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4413 blk_queue_max_segment_size(q, segment_size);
4414 blk_queue_io_min(q, segment_size);
4415 blk_queue_io_opt(q, segment_size);
029bcbd8 4416
90e98c52
GZ
4417 /* enable the discard support */
4418 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4419 q->limits.discard_granularity = segment_size;
4420 q->limits.discard_alignment = segment_size;
2bb4cd5c 4421 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
b76f8239 4422 q->limits.discard_zeroes_data = 1;
90e98c52 4423
bae818ee
RH
4424 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4425 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4426
602adf40
YS
4427 disk->queue = q;
4428
4429 q->queuedata = rbd_dev;
4430
4431 rbd_dev->disk = disk;
602adf40 4432
602adf40 4433 return 0;
7ad18afa
CH
4434out_tag_set:
4435 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4436out_disk:
4437 put_disk(disk);
7ad18afa 4438 return err;
602adf40
YS
4439}
4440
dfc5606d
YS
4441/*
4442 sysfs
4443*/
4444
593a9e7b
AE
4445static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4446{
4447 return container_of(dev, struct rbd_device, dev);
4448}
4449
dfc5606d
YS
4450static ssize_t rbd_size_show(struct device *dev,
4451 struct device_attribute *attr, char *buf)
4452{
593a9e7b 4453 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4454
fc71d833
AE
4455 return sprintf(buf, "%llu\n",
4456 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4457}
4458
34b13184
AE
4459/*
4460 * Note this shows the features for whatever's mapped, which is not
4461 * necessarily the base image.
4462 */
4463static ssize_t rbd_features_show(struct device *dev,
4464 struct device_attribute *attr, char *buf)
4465{
4466 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4467
4468 return sprintf(buf, "0x%016llx\n",
fc71d833 4469 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4470}
4471
dfc5606d
YS
4472static ssize_t rbd_major_show(struct device *dev,
4473 struct device_attribute *attr, char *buf)
4474{
593a9e7b 4475 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4476
fc71d833
AE
4477 if (rbd_dev->major)
4478 return sprintf(buf, "%d\n", rbd_dev->major);
4479
4480 return sprintf(buf, "(none)\n");
dd82fff1
ID
4481}
4482
4483static ssize_t rbd_minor_show(struct device *dev,
4484 struct device_attribute *attr, char *buf)
4485{
4486 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4487
dd82fff1 4488 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4489}
4490
005a07bf
ID
4491static ssize_t rbd_client_addr_show(struct device *dev,
4492 struct device_attribute *attr, char *buf)
4493{
4494 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4495 struct ceph_entity_addr *client_addr =
4496 ceph_client_addr(rbd_dev->rbd_client->client);
4497
4498 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4499 le32_to_cpu(client_addr->nonce));
4500}
4501
dfc5606d
YS
4502static ssize_t rbd_client_id_show(struct device *dev,
4503 struct device_attribute *attr, char *buf)
602adf40 4504{
593a9e7b 4505 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4506
1dbb4399 4507 return sprintf(buf, "client%lld\n",
033268a5 4508 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4509}
4510
267fb90b
MC
4511static ssize_t rbd_cluster_fsid_show(struct device *dev,
4512 struct device_attribute *attr, char *buf)
4513{
4514 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4515
4516 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4517}
4518
0d6d1e9c
MC
4519static ssize_t rbd_config_info_show(struct device *dev,
4520 struct device_attribute *attr, char *buf)
4521{
4522 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4523
4524 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4525}
4526
dfc5606d
YS
4527static ssize_t rbd_pool_show(struct device *dev,
4528 struct device_attribute *attr, char *buf)
602adf40 4529{
593a9e7b 4530 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4531
0d7dbfce 4532 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4533}
4534
9bb2f334
AE
4535static ssize_t rbd_pool_id_show(struct device *dev,
4536 struct device_attribute *attr, char *buf)
4537{
4538 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4539
0d7dbfce 4540 return sprintf(buf, "%llu\n",
fc71d833 4541 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4542}
4543
dfc5606d
YS
4544static ssize_t rbd_name_show(struct device *dev,
4545 struct device_attribute *attr, char *buf)
4546{
593a9e7b 4547 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4548
a92ffdf8
AE
4549 if (rbd_dev->spec->image_name)
4550 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4551
4552 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4553}
4554
589d30e0
AE
4555static ssize_t rbd_image_id_show(struct device *dev,
4556 struct device_attribute *attr, char *buf)
4557{
4558 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4559
0d7dbfce 4560 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4561}
4562
34b13184
AE
4563/*
4564 * Shows the name of the currently-mapped snapshot (or
4565 * RBD_SNAP_HEAD_NAME for the base image).
4566 */
dfc5606d
YS
4567static ssize_t rbd_snap_show(struct device *dev,
4568 struct device_attribute *attr,
4569 char *buf)
4570{
593a9e7b 4571 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4572
0d7dbfce 4573 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4574}
4575
92a58671
MC
4576static ssize_t rbd_snap_id_show(struct device *dev,
4577 struct device_attribute *attr, char *buf)
4578{
4579 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4580
4581 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4582}
4583
86b00e0d 4584/*
ff96128f
ID
4585 * For a v2 image, shows the chain of parent images, separated by empty
4586 * lines. For v1 images or if there is no parent, shows "(no parent
4587 * image)".
86b00e0d
AE
4588 */
4589static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4590 struct device_attribute *attr,
4591 char *buf)
86b00e0d
AE
4592{
4593 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4594 ssize_t count = 0;
86b00e0d 4595
ff96128f 4596 if (!rbd_dev->parent)
86b00e0d
AE
4597 return sprintf(buf, "(no parent image)\n");
4598
ff96128f
ID
4599 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4600 struct rbd_spec *spec = rbd_dev->parent_spec;
4601
4602 count += sprintf(&buf[count], "%s"
4603 "pool_id %llu\npool_name %s\n"
4604 "image_id %s\nimage_name %s\n"
4605 "snap_id %llu\nsnap_name %s\n"
4606 "overlap %llu\n",
4607 !count ? "" : "\n", /* first? */
4608 spec->pool_id, spec->pool_name,
4609 spec->image_id, spec->image_name ?: "(unknown)",
4610 spec->snap_id, spec->snap_name,
4611 rbd_dev->parent_overlap);
4612 }
4613
4614 return count;
86b00e0d
AE
4615}
4616
dfc5606d
YS
4617static ssize_t rbd_image_refresh(struct device *dev,
4618 struct device_attribute *attr,
4619 const char *buf,
4620 size_t size)
4621{
593a9e7b 4622 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4623 int ret;
602adf40 4624
cc4a38bd 4625 ret = rbd_dev_refresh(rbd_dev);
e627db08 4626 if (ret)
52bb1f9b 4627 return ret;
b813623a 4628
52bb1f9b 4629 return size;
dfc5606d 4630}
602adf40 4631
dfc5606d 4632static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4633static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4634static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4635static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4636static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4637static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4638static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4639static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4640static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4641static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4642static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4643static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4644static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4645static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4646static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4647static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4648
4649static struct attribute *rbd_attrs[] = {
4650 &dev_attr_size.attr,
34b13184 4651 &dev_attr_features.attr,
dfc5606d 4652 &dev_attr_major.attr,
dd82fff1 4653 &dev_attr_minor.attr,
005a07bf 4654 &dev_attr_client_addr.attr,
dfc5606d 4655 &dev_attr_client_id.attr,
267fb90b 4656 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4657 &dev_attr_config_info.attr,
dfc5606d 4658 &dev_attr_pool.attr,
9bb2f334 4659 &dev_attr_pool_id.attr,
dfc5606d 4660 &dev_attr_name.attr,
589d30e0 4661 &dev_attr_image_id.attr,
dfc5606d 4662 &dev_attr_current_snap.attr,
92a58671 4663 &dev_attr_snap_id.attr,
86b00e0d 4664 &dev_attr_parent.attr,
dfc5606d 4665 &dev_attr_refresh.attr,
dfc5606d
YS
4666 NULL
4667};
4668
4669static struct attribute_group rbd_attr_group = {
4670 .attrs = rbd_attrs,
4671};
4672
4673static const struct attribute_group *rbd_attr_groups[] = {
4674 &rbd_attr_group,
4675 NULL
4676};
4677
6cac4695 4678static void rbd_dev_release(struct device *dev);
dfc5606d
YS
4679
4680static struct device_type rbd_device_type = {
4681 .name = "rbd",
4682 .groups = rbd_attr_groups,
6cac4695 4683 .release = rbd_dev_release,
dfc5606d
YS
4684};
4685
8b8fb99c
AE
4686static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4687{
4688 kref_get(&spec->kref);
4689
4690 return spec;
4691}
4692
4693static void rbd_spec_free(struct kref *kref);
4694static void rbd_spec_put(struct rbd_spec *spec)
4695{
4696 if (spec)
4697 kref_put(&spec->kref, rbd_spec_free);
4698}
4699
4700static struct rbd_spec *rbd_spec_alloc(void)
4701{
4702 struct rbd_spec *spec;
4703
4704 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4705 if (!spec)
4706 return NULL;
04077599
ID
4707
4708 spec->pool_id = CEPH_NOPOOL;
4709 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4710 kref_init(&spec->kref);
4711
8b8fb99c
AE
4712 return spec;
4713}
4714
4715static void rbd_spec_free(struct kref *kref)
4716{
4717 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4718
4719 kfree(spec->pool_name);
4720 kfree(spec->image_id);
4721 kfree(spec->image_name);
4722 kfree(spec->snap_name);
4723 kfree(spec);
4724}
4725
1643dfa4 4726static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4727{
99d16943 4728 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4729 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4730
c41d13a3 4731 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4732 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4733 kfree(rbd_dev->config_info);
c41d13a3 4734
dd5ac32d
ID
4735 rbd_put_client(rbd_dev->rbd_client);
4736 rbd_spec_put(rbd_dev->spec);
4737 kfree(rbd_dev->opts);
4738 kfree(rbd_dev);
1643dfa4
ID
4739}
4740
4741static void rbd_dev_release(struct device *dev)
4742{
4743 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4744 bool need_put = !!rbd_dev->opts;
4745
4746 if (need_put) {
4747 destroy_workqueue(rbd_dev->task_wq);
4748 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4749 }
4750
4751 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4752
4753 /*
4754 * This is racy, but way better than putting module outside of
4755 * the release callback. The race window is pretty small, so
4756 * doing something similar to dm (dm-builtin.c) is overkill.
4757 */
4758 if (need_put)
4759 module_put(THIS_MODULE);
4760}
4761
1643dfa4
ID
4762static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4763 struct rbd_spec *spec)
c53d5893
AE
4764{
4765 struct rbd_device *rbd_dev;
4766
1643dfa4 4767 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4768 if (!rbd_dev)
4769 return NULL;
4770
4771 spin_lock_init(&rbd_dev->lock);
4772 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4773 init_rwsem(&rbd_dev->header_rwsem);
4774
7e97332e 4775 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4776 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4777 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4778
99d16943
ID
4779 mutex_init(&rbd_dev->watch_mutex);
4780 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4781 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4782
ed95b21a
ID
4783 init_rwsem(&rbd_dev->lock_rwsem);
4784 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4785 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4786 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4787 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4788 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4789 init_waitqueue_head(&rbd_dev->lock_waitq);
4790
dd5ac32d
ID
4791 rbd_dev->dev.bus = &rbd_bus_type;
4792 rbd_dev->dev.type = &rbd_device_type;
4793 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4794 device_initialize(&rbd_dev->dev);
4795
c53d5893 4796 rbd_dev->rbd_client = rbdc;
d147543d 4797 rbd_dev->spec = spec;
0903e875 4798
1643dfa4
ID
4799 return rbd_dev;
4800}
4801
4802/*
4803 * Create a mapping rbd_dev.
4804 */
4805static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4806 struct rbd_spec *spec,
4807 struct rbd_options *opts)
4808{
4809 struct rbd_device *rbd_dev;
4810
4811 rbd_dev = __rbd_dev_create(rbdc, spec);
4812 if (!rbd_dev)
4813 return NULL;
4814
4815 rbd_dev->opts = opts;
4816
4817 /* get an id and fill in device name */
4818 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4819 minor_to_rbd_dev_id(1 << MINORBITS),
4820 GFP_KERNEL);
4821 if (rbd_dev->dev_id < 0)
4822 goto fail_rbd_dev;
4823
4824 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4825 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4826 rbd_dev->name);
4827 if (!rbd_dev->task_wq)
4828 goto fail_dev_id;
dd5ac32d 4829
1643dfa4
ID
4830 /* we have a ref from do_rbd_add() */
4831 __module_get(THIS_MODULE);
dd5ac32d 4832
1643dfa4 4833 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4834 return rbd_dev;
1643dfa4
ID
4835
4836fail_dev_id:
4837 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4838fail_rbd_dev:
4839 rbd_dev_free(rbd_dev);
4840 return NULL;
c53d5893
AE
4841}
4842
4843static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4844{
dd5ac32d
ID
4845 if (rbd_dev)
4846 put_device(&rbd_dev->dev);
c53d5893
AE
4847}
4848
9d475de5
AE
4849/*
4850 * Get the size and object order for an image snapshot, or if
4851 * snap_id is CEPH_NOSNAP, gets this information for the base
4852 * image.
4853 */
4854static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4855 u8 *order, u64 *snap_size)
4856{
4857 __le64 snapid = cpu_to_le64(snap_id);
4858 int ret;
4859 struct {
4860 u8 order;
4861 __le64 size;
4862 } __attribute__ ((packed)) size_buf = { 0 };
4863
ecd4a68a
ID
4864 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4865 &rbd_dev->header_oloc, "get_size",
4866 &snapid, sizeof(snapid),
4867 &size_buf, sizeof(size_buf));
36be9a76 4868 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4869 if (ret < 0)
4870 return ret;
57385b51
AE
4871 if (ret < sizeof (size_buf))
4872 return -ERANGE;
9d475de5 4873
c3545579 4874 if (order) {
c86f86e9 4875 *order = size_buf.order;
c3545579
JD
4876 dout(" order %u", (unsigned int)*order);
4877 }
9d475de5
AE
4878 *snap_size = le64_to_cpu(size_buf.size);
4879
c3545579
JD
4880 dout(" snap_id 0x%016llx snap_size = %llu\n",
4881 (unsigned long long)snap_id,
57385b51 4882 (unsigned long long)*snap_size);
9d475de5
AE
4883
4884 return 0;
4885}
4886
4887static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4888{
4889 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4890 &rbd_dev->header.obj_order,
4891 &rbd_dev->header.image_size);
4892}
4893
1e130199
AE
4894static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4895{
4896 void *reply_buf;
4897 int ret;
4898 void *p;
4899
4900 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4901 if (!reply_buf)
4902 return -ENOMEM;
4903
ecd4a68a
ID
4904 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4905 &rbd_dev->header_oloc, "get_object_prefix",
4906 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4907 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4908 if (ret < 0)
4909 goto out;
4910
4911 p = reply_buf;
4912 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4913 p + ret, NULL, GFP_NOIO);
4914 ret = 0;
1e130199
AE
4915
4916 if (IS_ERR(rbd_dev->header.object_prefix)) {
4917 ret = PTR_ERR(rbd_dev->header.object_prefix);
4918 rbd_dev->header.object_prefix = NULL;
4919 } else {
4920 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4921 }
1e130199
AE
4922out:
4923 kfree(reply_buf);
4924
4925 return ret;
4926}
4927
b1b5402a
AE
4928static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4929 u64 *snap_features)
4930{
4931 __le64 snapid = cpu_to_le64(snap_id);
4932 struct {
4933 __le64 features;
4934 __le64 incompat;
4157976b 4935 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4936 u64 unsup;
b1b5402a
AE
4937 int ret;
4938
ecd4a68a
ID
4939 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4940 &rbd_dev->header_oloc, "get_features",
4941 &snapid, sizeof(snapid),
4942 &features_buf, sizeof(features_buf));
36be9a76 4943 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4944 if (ret < 0)
4945 return ret;
57385b51
AE
4946 if (ret < sizeof (features_buf))
4947 return -ERANGE;
d889140c 4948
d3767f0f
ID
4949 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4950 if (unsup) {
4951 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4952 unsup);
b8f5c6ed 4953 return -ENXIO;
d3767f0f 4954 }
d889140c 4955
b1b5402a
AE
4956 *snap_features = le64_to_cpu(features_buf.features);
4957
4958 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4959 (unsigned long long)snap_id,
4960 (unsigned long long)*snap_features,
4961 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4962
4963 return 0;
4964}
4965
4966static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4967{
4968 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4969 &rbd_dev->header.features);
4970}
4971
86b00e0d
AE
4972static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4973{
4974 struct rbd_spec *parent_spec;
4975 size_t size;
4976 void *reply_buf = NULL;
4977 __le64 snapid;
4978 void *p;
4979 void *end;
642a2537 4980 u64 pool_id;
86b00e0d 4981 char *image_id;
3b5cf2a2 4982 u64 snap_id;
86b00e0d 4983 u64 overlap;
86b00e0d
AE
4984 int ret;
4985
4986 parent_spec = rbd_spec_alloc();
4987 if (!parent_spec)
4988 return -ENOMEM;
4989
4990 size = sizeof (__le64) + /* pool_id */
4991 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4992 sizeof (__le64) + /* snap_id */
4993 sizeof (__le64); /* overlap */
4994 reply_buf = kmalloc(size, GFP_KERNEL);
4995 if (!reply_buf) {
4996 ret = -ENOMEM;
4997 goto out_err;
4998 }
4999
4d9b67cd 5000 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
5001 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5002 &rbd_dev->header_oloc, "get_parent",
5003 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5004 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5005 if (ret < 0)
5006 goto out_err;
5007
86b00e0d 5008 p = reply_buf;
57385b51
AE
5009 end = reply_buf + ret;
5010 ret = -ERANGE;
642a2537 5011 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5012 if (pool_id == CEPH_NOPOOL) {
5013 /*
5014 * Either the parent never existed, or we have
5015 * record of it but the image got flattened so it no
5016 * longer has a parent. When the parent of a
5017 * layered image disappears we immediately set the
5018 * overlap to 0. The effect of this is that all new
5019 * requests will be treated as if the image had no
5020 * parent.
5021 */
5022 if (rbd_dev->parent_overlap) {
5023 rbd_dev->parent_overlap = 0;
392a9dad
AE
5024 rbd_dev_parent_put(rbd_dev);
5025 pr_info("%s: clone image has been flattened\n",
5026 rbd_dev->disk->disk_name);
5027 }
5028
86b00e0d 5029 goto out; /* No parent? No problem. */
392a9dad 5030 }
86b00e0d 5031
0903e875
AE
5032 /* The ceph file layout needs to fit pool id in 32 bits */
5033
5034 ret = -EIO;
642a2537 5035 if (pool_id > (u64)U32_MAX) {
9584d508 5036 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5037 (unsigned long long)pool_id, U32_MAX);
57385b51 5038 goto out_err;
c0cd10db 5039 }
0903e875 5040
979ed480 5041 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5042 if (IS_ERR(image_id)) {
5043 ret = PTR_ERR(image_id);
5044 goto out_err;
5045 }
3b5cf2a2 5046 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5047 ceph_decode_64_safe(&p, end, overlap, out_err);
5048
3b5cf2a2
AE
5049 /*
5050 * The parent won't change (except when the clone is
5051 * flattened, already handled that). So we only need to
5052 * record the parent spec we have not already done so.
5053 */
5054 if (!rbd_dev->parent_spec) {
5055 parent_spec->pool_id = pool_id;
5056 parent_spec->image_id = image_id;
5057 parent_spec->snap_id = snap_id;
70cf49cf
AE
5058 rbd_dev->parent_spec = parent_spec;
5059 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5060 } else {
5061 kfree(image_id);
3b5cf2a2
AE
5062 }
5063
5064 /*
cf32bd9c
ID
5065 * We always update the parent overlap. If it's zero we issue
5066 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5067 */
3b5cf2a2 5068 if (!overlap) {
3b5cf2a2 5069 if (parent_spec) {
cf32bd9c
ID
5070 /* refresh, careful to warn just once */
5071 if (rbd_dev->parent_overlap)
5072 rbd_warn(rbd_dev,
5073 "clone now standalone (overlap became 0)");
3b5cf2a2 5074 } else {
cf32bd9c
ID
5075 /* initial probe */
5076 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5077 }
70cf49cf 5078 }
cf32bd9c
ID
5079 rbd_dev->parent_overlap = overlap;
5080
86b00e0d
AE
5081out:
5082 ret = 0;
5083out_err:
5084 kfree(reply_buf);
5085 rbd_spec_put(parent_spec);
5086
5087 return ret;
5088}
5089
cc070d59
AE
5090static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5091{
5092 struct {
5093 __le64 stripe_unit;
5094 __le64 stripe_count;
5095 } __attribute__ ((packed)) striping_info_buf = { 0 };
5096 size_t size = sizeof (striping_info_buf);
5097 void *p;
5098 u64 obj_size;
5099 u64 stripe_unit;
5100 u64 stripe_count;
5101 int ret;
5102
ecd4a68a
ID
5103 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5104 &rbd_dev->header_oloc, "get_stripe_unit_count",
5105 NULL, 0, &striping_info_buf, size);
cc070d59
AE
5106 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5107 if (ret < 0)
5108 return ret;
5109 if (ret < size)
5110 return -ERANGE;
5111
5112 /*
5113 * We don't actually support the "fancy striping" feature
5114 * (STRIPINGV2) yet, but if the striping sizes are the
5115 * defaults the behavior is the same as before. So find
5116 * out, and only fail if the image has non-default values.
5117 */
5118 ret = -EINVAL;
5bc3fb17 5119 obj_size = rbd_obj_bytes(&rbd_dev->header);
cc070d59
AE
5120 p = &striping_info_buf;
5121 stripe_unit = ceph_decode_64(&p);
5122 if (stripe_unit != obj_size) {
5123 rbd_warn(rbd_dev, "unsupported stripe unit "
5124 "(got %llu want %llu)",
5125 stripe_unit, obj_size);
5126 return -EINVAL;
5127 }
5128 stripe_count = ceph_decode_64(&p);
5129 if (stripe_count != 1) {
5130 rbd_warn(rbd_dev, "unsupported stripe count "
5131 "(got %llu want 1)", stripe_count);
5132 return -EINVAL;
5133 }
500d0c0f
AE
5134 rbd_dev->header.stripe_unit = stripe_unit;
5135 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5136
5137 return 0;
5138}
5139
7e97332e
ID
5140static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5141{
5142 __le64 data_pool_id;
5143 int ret;
5144
5145 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5146 &rbd_dev->header_oloc, "get_data_pool",
5147 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5148 if (ret < 0)
5149 return ret;
5150 if (ret < sizeof(data_pool_id))
5151 return -EBADMSG;
5152
5153 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5154 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5155 return 0;
5156}
5157
9e15b77d
AE
5158static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5159{
ecd4a68a 5160 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
5161 size_t image_id_size;
5162 char *image_id;
5163 void *p;
5164 void *end;
5165 size_t size;
5166 void *reply_buf = NULL;
5167 size_t len = 0;
5168 char *image_name = NULL;
5169 int ret;
5170
5171 rbd_assert(!rbd_dev->spec->image_name);
5172
69e7a02f
AE
5173 len = strlen(rbd_dev->spec->image_id);
5174 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5175 image_id = kmalloc(image_id_size, GFP_KERNEL);
5176 if (!image_id)
5177 return NULL;
5178
5179 p = image_id;
4157976b 5180 end = image_id + image_id_size;
57385b51 5181 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5182
5183 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5184 reply_buf = kmalloc(size, GFP_KERNEL);
5185 if (!reply_buf)
5186 goto out;
5187
ecd4a68a
ID
5188 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5189 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5190 "dir_get_name", image_id, image_id_size,
5191 reply_buf, size);
9e15b77d
AE
5192 if (ret < 0)
5193 goto out;
5194 p = reply_buf;
f40eb349
AE
5195 end = reply_buf + ret;
5196
9e15b77d
AE
5197 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5198 if (IS_ERR(image_name))
5199 image_name = NULL;
5200 else
5201 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5202out:
5203 kfree(reply_buf);
5204 kfree(image_id);
5205
5206 return image_name;
5207}
5208
2ad3d716
AE
5209static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5210{
5211 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5212 const char *snap_name;
5213 u32 which = 0;
5214
5215 /* Skip over names until we find the one we are looking for */
5216
5217 snap_name = rbd_dev->header.snap_names;
5218 while (which < snapc->num_snaps) {
5219 if (!strcmp(name, snap_name))
5220 return snapc->snaps[which];
5221 snap_name += strlen(snap_name) + 1;
5222 which++;
5223 }
5224 return CEPH_NOSNAP;
5225}
5226
5227static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5228{
5229 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5230 u32 which;
5231 bool found = false;
5232 u64 snap_id;
5233
5234 for (which = 0; !found && which < snapc->num_snaps; which++) {
5235 const char *snap_name;
5236
5237 snap_id = snapc->snaps[which];
5238 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5239 if (IS_ERR(snap_name)) {
5240 /* ignore no-longer existing snapshots */
5241 if (PTR_ERR(snap_name) == -ENOENT)
5242 continue;
5243 else
5244 break;
5245 }
2ad3d716
AE
5246 found = !strcmp(name, snap_name);
5247 kfree(snap_name);
5248 }
5249 return found ? snap_id : CEPH_NOSNAP;
5250}
5251
5252/*
5253 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5254 * no snapshot by that name is found, or if an error occurs.
5255 */
5256static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5257{
5258 if (rbd_dev->image_format == 1)
5259 return rbd_v1_snap_id_by_name(rbd_dev, name);
5260
5261 return rbd_v2_snap_id_by_name(rbd_dev, name);
5262}
5263
9e15b77d 5264/*
04077599
ID
5265 * An image being mapped will have everything but the snap id.
5266 */
5267static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5268{
5269 struct rbd_spec *spec = rbd_dev->spec;
5270
5271 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5272 rbd_assert(spec->image_id && spec->image_name);
5273 rbd_assert(spec->snap_name);
5274
5275 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5276 u64 snap_id;
5277
5278 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5279 if (snap_id == CEPH_NOSNAP)
5280 return -ENOENT;
5281
5282 spec->snap_id = snap_id;
5283 } else {
5284 spec->snap_id = CEPH_NOSNAP;
5285 }
5286
5287 return 0;
5288}
5289
5290/*
5291 * A parent image will have all ids but none of the names.
e1d4213f 5292 *
04077599
ID
5293 * All names in an rbd spec are dynamically allocated. It's OK if we
5294 * can't figure out the name for an image id.
9e15b77d 5295 */
04077599 5296static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5297{
2e9f7f1c
AE
5298 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5299 struct rbd_spec *spec = rbd_dev->spec;
5300 const char *pool_name;
5301 const char *image_name;
5302 const char *snap_name;
9e15b77d
AE
5303 int ret;
5304
04077599
ID
5305 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5306 rbd_assert(spec->image_id);
5307 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5308
2e9f7f1c 5309 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5310
2e9f7f1c
AE
5311 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5312 if (!pool_name) {
5313 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5314 return -EIO;
5315 }
2e9f7f1c
AE
5316 pool_name = kstrdup(pool_name, GFP_KERNEL);
5317 if (!pool_name)
9e15b77d
AE
5318 return -ENOMEM;
5319
5320 /* Fetch the image name; tolerate failure here */
5321
2e9f7f1c
AE
5322 image_name = rbd_dev_image_name(rbd_dev);
5323 if (!image_name)
06ecc6cb 5324 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5325
04077599 5326 /* Fetch the snapshot name */
9e15b77d 5327
2e9f7f1c 5328 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5329 if (IS_ERR(snap_name)) {
5330 ret = PTR_ERR(snap_name);
9e15b77d 5331 goto out_err;
2e9f7f1c
AE
5332 }
5333
5334 spec->pool_name = pool_name;
5335 spec->image_name = image_name;
5336 spec->snap_name = snap_name;
9e15b77d
AE
5337
5338 return 0;
04077599 5339
9e15b77d 5340out_err:
2e9f7f1c
AE
5341 kfree(image_name);
5342 kfree(pool_name);
9e15b77d
AE
5343 return ret;
5344}
5345
cc4a38bd 5346static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5347{
5348 size_t size;
5349 int ret;
5350 void *reply_buf;
5351 void *p;
5352 void *end;
5353 u64 seq;
5354 u32 snap_count;
5355 struct ceph_snap_context *snapc;
5356 u32 i;
5357
5358 /*
5359 * We'll need room for the seq value (maximum snapshot id),
5360 * snapshot count, and array of that many snapshot ids.
5361 * For now we have a fixed upper limit on the number we're
5362 * prepared to receive.
5363 */
5364 size = sizeof (__le64) + sizeof (__le32) +
5365 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5366 reply_buf = kzalloc(size, GFP_KERNEL);
5367 if (!reply_buf)
5368 return -ENOMEM;
5369
ecd4a68a
ID
5370 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5371 &rbd_dev->header_oloc, "get_snapcontext",
5372 NULL, 0, reply_buf, size);
36be9a76 5373 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5374 if (ret < 0)
5375 goto out;
5376
35d489f9 5377 p = reply_buf;
57385b51
AE
5378 end = reply_buf + ret;
5379 ret = -ERANGE;
35d489f9
AE
5380 ceph_decode_64_safe(&p, end, seq, out);
5381 ceph_decode_32_safe(&p, end, snap_count, out);
5382
5383 /*
5384 * Make sure the reported number of snapshot ids wouldn't go
5385 * beyond the end of our buffer. But before checking that,
5386 * make sure the computed size of the snapshot context we
5387 * allocate is representable in a size_t.
5388 */
5389 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5390 / sizeof (u64)) {
5391 ret = -EINVAL;
5392 goto out;
5393 }
5394 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5395 goto out;
468521c1 5396 ret = 0;
35d489f9 5397
812164f8 5398 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5399 if (!snapc) {
5400 ret = -ENOMEM;
5401 goto out;
5402 }
35d489f9 5403 snapc->seq = seq;
35d489f9
AE
5404 for (i = 0; i < snap_count; i++)
5405 snapc->snaps[i] = ceph_decode_64(&p);
5406
49ece554 5407 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5408 rbd_dev->header.snapc = snapc;
5409
5410 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5411 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5412out:
5413 kfree(reply_buf);
5414
57385b51 5415 return ret;
35d489f9
AE
5416}
5417
54cac61f
AE
5418static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5419 u64 snap_id)
b8b1e2db
AE
5420{
5421 size_t size;
5422 void *reply_buf;
54cac61f 5423 __le64 snapid;
b8b1e2db
AE
5424 int ret;
5425 void *p;
5426 void *end;
b8b1e2db
AE
5427 char *snap_name;
5428
5429 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5430 reply_buf = kmalloc(size, GFP_KERNEL);
5431 if (!reply_buf)
5432 return ERR_PTR(-ENOMEM);
5433
54cac61f 5434 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
5435 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5436 &rbd_dev->header_oloc, "get_snapshot_name",
5437 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5438 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5439 if (ret < 0) {
5440 snap_name = ERR_PTR(ret);
b8b1e2db 5441 goto out;
f40eb349 5442 }
b8b1e2db
AE
5443
5444 p = reply_buf;
f40eb349 5445 end = reply_buf + ret;
e5c35534 5446 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5447 if (IS_ERR(snap_name))
b8b1e2db 5448 goto out;
b8b1e2db 5449
f40eb349 5450 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5451 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5452out:
5453 kfree(reply_buf);
5454
f40eb349 5455 return snap_name;
b8b1e2db
AE
5456}
5457
2df3fac7 5458static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5459{
2df3fac7 5460 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5461 int ret;
117973fb 5462
1617e40c
JD
5463 ret = rbd_dev_v2_image_size(rbd_dev);
5464 if (ret)
cfbf6377 5465 return ret;
1617e40c 5466
2df3fac7
AE
5467 if (first_time) {
5468 ret = rbd_dev_v2_header_onetime(rbd_dev);
5469 if (ret)
cfbf6377 5470 return ret;
2df3fac7
AE
5471 }
5472
cc4a38bd 5473 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5474 if (ret && first_time) {
5475 kfree(rbd_dev->header.object_prefix);
5476 rbd_dev->header.object_prefix = NULL;
5477 }
117973fb
AE
5478
5479 return ret;
5480}
5481
a720ae09
ID
5482static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5483{
5484 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5485
5486 if (rbd_dev->image_format == 1)
5487 return rbd_dev_v1_header_info(rbd_dev);
5488
5489 return rbd_dev_v2_header_info(rbd_dev);
5490}
5491
e28fff26
AE
5492/*
5493 * Skips over white space at *buf, and updates *buf to point to the
5494 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5495 * the token (string of non-white space characters) found. Note
5496 * that *buf must be terminated with '\0'.
e28fff26
AE
5497 */
5498static inline size_t next_token(const char **buf)
5499{
5500 /*
5501 * These are the characters that produce nonzero for
5502 * isspace() in the "C" and "POSIX" locales.
5503 */
5504 const char *spaces = " \f\n\r\t\v";
5505
5506 *buf += strspn(*buf, spaces); /* Find start of token */
5507
5508 return strcspn(*buf, spaces); /* Return token length */
5509}
5510
ea3352f4
AE
5511/*
5512 * Finds the next token in *buf, dynamically allocates a buffer big
5513 * enough to hold a copy of it, and copies the token into the new
5514 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5515 * that a duplicate buffer is created even for a zero-length token.
5516 *
5517 * Returns a pointer to the newly-allocated duplicate, or a null
5518 * pointer if memory for the duplicate was not available. If
5519 * the lenp argument is a non-null pointer, the length of the token
5520 * (not including the '\0') is returned in *lenp.
5521 *
5522 * If successful, the *buf pointer will be updated to point beyond
5523 * the end of the found token.
5524 *
5525 * Note: uses GFP_KERNEL for allocation.
5526 */
5527static inline char *dup_token(const char **buf, size_t *lenp)
5528{
5529 char *dup;
5530 size_t len;
5531
5532 len = next_token(buf);
4caf35f9 5533 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5534 if (!dup)
5535 return NULL;
ea3352f4
AE
5536 *(dup + len) = '\0';
5537 *buf += len;
5538
5539 if (lenp)
5540 *lenp = len;
5541
5542 return dup;
5543}
5544
a725f65e 5545/*
859c31df
AE
5546 * Parse the options provided for an "rbd add" (i.e., rbd image
5547 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5548 * and the data written is passed here via a NUL-terminated buffer.
5549 * Returns 0 if successful or an error code otherwise.
d22f76e7 5550 *
859c31df
AE
5551 * The information extracted from these options is recorded in
5552 * the other parameters which return dynamically-allocated
5553 * structures:
5554 * ceph_opts
5555 * The address of a pointer that will refer to a ceph options
5556 * structure. Caller must release the returned pointer using
5557 * ceph_destroy_options() when it is no longer needed.
5558 * rbd_opts
5559 * Address of an rbd options pointer. Fully initialized by
5560 * this function; caller must release with kfree().
5561 * spec
5562 * Address of an rbd image specification pointer. Fully
5563 * initialized by this function based on parsed options.
5564 * Caller must release with rbd_spec_put().
5565 *
5566 * The options passed take this form:
5567 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5568 * where:
5569 * <mon_addrs>
5570 * A comma-separated list of one or more monitor addresses.
5571 * A monitor address is an ip address, optionally followed
5572 * by a port number (separated by a colon).
5573 * I.e.: ip1[:port1][,ip2[:port2]...]
5574 * <options>
5575 * A comma-separated list of ceph and/or rbd options.
5576 * <pool_name>
5577 * The name of the rados pool containing the rbd image.
5578 * <image_name>
5579 * The name of the image in that pool to map.
5580 * <snap_id>
5581 * An optional snapshot id. If provided, the mapping will
5582 * present data from the image at the time that snapshot was
5583 * created. The image head is used if no snapshot id is
5584 * provided. Snapshot mappings are always read-only.
a725f65e 5585 */
859c31df 5586static int rbd_add_parse_args(const char *buf,
dc79b113 5587 struct ceph_options **ceph_opts,
859c31df
AE
5588 struct rbd_options **opts,
5589 struct rbd_spec **rbd_spec)
e28fff26 5590{
d22f76e7 5591 size_t len;
859c31df 5592 char *options;
0ddebc0c 5593 const char *mon_addrs;
ecb4dc22 5594 char *snap_name;
0ddebc0c 5595 size_t mon_addrs_size;
859c31df 5596 struct rbd_spec *spec = NULL;
4e9afeba 5597 struct rbd_options *rbd_opts = NULL;
859c31df 5598 struct ceph_options *copts;
dc79b113 5599 int ret;
e28fff26
AE
5600
5601 /* The first four tokens are required */
5602
7ef3214a 5603 len = next_token(&buf);
4fb5d671
AE
5604 if (!len) {
5605 rbd_warn(NULL, "no monitor address(es) provided");
5606 return -EINVAL;
5607 }
0ddebc0c 5608 mon_addrs = buf;
f28e565a 5609 mon_addrs_size = len + 1;
7ef3214a 5610 buf += len;
a725f65e 5611
dc79b113 5612 ret = -EINVAL;
f28e565a
AE
5613 options = dup_token(&buf, NULL);
5614 if (!options)
dc79b113 5615 return -ENOMEM;
4fb5d671
AE
5616 if (!*options) {
5617 rbd_warn(NULL, "no options provided");
5618 goto out_err;
5619 }
e28fff26 5620
859c31df
AE
5621 spec = rbd_spec_alloc();
5622 if (!spec)
f28e565a 5623 goto out_mem;
859c31df
AE
5624
5625 spec->pool_name = dup_token(&buf, NULL);
5626 if (!spec->pool_name)
5627 goto out_mem;
4fb5d671
AE
5628 if (!*spec->pool_name) {
5629 rbd_warn(NULL, "no pool name provided");
5630 goto out_err;
5631 }
e28fff26 5632
69e7a02f 5633 spec->image_name = dup_token(&buf, NULL);
859c31df 5634 if (!spec->image_name)
f28e565a 5635 goto out_mem;
4fb5d671
AE
5636 if (!*spec->image_name) {
5637 rbd_warn(NULL, "no image name provided");
5638 goto out_err;
5639 }
d4b125e9 5640
f28e565a
AE
5641 /*
5642 * Snapshot name is optional; default is to use "-"
5643 * (indicating the head/no snapshot).
5644 */
3feeb894 5645 len = next_token(&buf);
820a5f3e 5646 if (!len) {
3feeb894
AE
5647 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5648 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5649 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5650 ret = -ENAMETOOLONG;
f28e565a 5651 goto out_err;
849b4260 5652 }
ecb4dc22
AE
5653 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5654 if (!snap_name)
f28e565a 5655 goto out_mem;
ecb4dc22
AE
5656 *(snap_name + len) = '\0';
5657 spec->snap_name = snap_name;
e5c35534 5658
0ddebc0c 5659 /* Initialize all rbd options to the defaults */
e28fff26 5660
4e9afeba
AE
5661 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5662 if (!rbd_opts)
5663 goto out_mem;
5664
5665 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5666 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5667 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
d22f76e7 5668
859c31df 5669 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5670 mon_addrs + mon_addrs_size - 1,
4e9afeba 5671 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5672 if (IS_ERR(copts)) {
5673 ret = PTR_ERR(copts);
dc79b113
AE
5674 goto out_err;
5675 }
859c31df
AE
5676 kfree(options);
5677
5678 *ceph_opts = copts;
4e9afeba 5679 *opts = rbd_opts;
859c31df 5680 *rbd_spec = spec;
0ddebc0c 5681
dc79b113 5682 return 0;
f28e565a 5683out_mem:
dc79b113 5684 ret = -ENOMEM;
d22f76e7 5685out_err:
859c31df
AE
5686 kfree(rbd_opts);
5687 rbd_spec_put(spec);
f28e565a 5688 kfree(options);
d22f76e7 5689
dc79b113 5690 return ret;
a725f65e
AE
5691}
5692
30ba1f02
ID
5693/*
5694 * Return pool id (>= 0) or a negative error code.
5695 */
5696static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5697{
a319bf56 5698 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5699 u64 newest_epoch;
30ba1f02
ID
5700 int tries = 0;
5701 int ret;
5702
5703again:
5704 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5705 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5706 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5707 &newest_epoch);
30ba1f02
ID
5708 if (ret < 0)
5709 return ret;
5710
5711 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5712 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5713 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5714 newest_epoch,
5715 opts->mount_timeout);
30ba1f02
ID
5716 goto again;
5717 } else {
5718 /* the osdmap we have is new enough */
5719 return -ENOENT;
5720 }
5721 }
5722
5723 return ret;
5724}
5725
589d30e0
AE
5726/*
5727 * An rbd format 2 image has a unique identifier, distinct from the
5728 * name given to it by the user. Internally, that identifier is
5729 * what's used to specify the names of objects related to the image.
5730 *
5731 * A special "rbd id" object is used to map an rbd image name to its
5732 * id. If that object doesn't exist, then there is no v2 rbd image
5733 * with the supplied name.
5734 *
5735 * This function will record the given rbd_dev's image_id field if
5736 * it can be determined, and in that case will return 0. If any
5737 * errors occur a negative errno will be returned and the rbd_dev's
5738 * image_id field will be unchanged (and should be NULL).
5739 */
5740static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5741{
5742 int ret;
5743 size_t size;
ecd4a68a 5744 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5745 void *response;
c0fba368 5746 char *image_id;
2f82ee54 5747
2c0d0a10
AE
5748 /*
5749 * When probing a parent image, the image id is already
5750 * known (and the image name likely is not). There's no
c0fba368
AE
5751 * need to fetch the image id again in this case. We
5752 * do still need to set the image format though.
2c0d0a10 5753 */
c0fba368
AE
5754 if (rbd_dev->spec->image_id) {
5755 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5756
2c0d0a10 5757 return 0;
c0fba368 5758 }
2c0d0a10 5759
589d30e0
AE
5760 /*
5761 * First, see if the format 2 image id file exists, and if
5762 * so, get the image's persistent id from it.
5763 */
ecd4a68a
ID
5764 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5765 rbd_dev->spec->image_name);
5766 if (ret)
5767 return ret;
5768
5769 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5770
5771 /* Response will be an encoded string, which includes a length */
5772
5773 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5774 response = kzalloc(size, GFP_NOIO);
5775 if (!response) {
5776 ret = -ENOMEM;
5777 goto out;
5778 }
5779
c0fba368
AE
5780 /* If it doesn't exist we'll assume it's a format 1 image */
5781
ecd4a68a
ID
5782 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5783 "get_id", NULL, 0,
5784 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5785 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5786 if (ret == -ENOENT) {
5787 image_id = kstrdup("", GFP_KERNEL);
5788 ret = image_id ? 0 : -ENOMEM;
5789 if (!ret)
5790 rbd_dev->image_format = 1;
7dd440c9 5791 } else if (ret >= 0) {
c0fba368
AE
5792 void *p = response;
5793
5794 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5795 NULL, GFP_NOIO);
461f758a 5796 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5797 if (!ret)
5798 rbd_dev->image_format = 2;
c0fba368
AE
5799 }
5800
5801 if (!ret) {
5802 rbd_dev->spec->image_id = image_id;
5803 dout("image_id is %s\n", image_id);
589d30e0
AE
5804 }
5805out:
5806 kfree(response);
ecd4a68a 5807 ceph_oid_destroy(&oid);
589d30e0
AE
5808 return ret;
5809}
5810
3abef3b3
AE
5811/*
5812 * Undo whatever state changes are made by v1 or v2 header info
5813 * call.
5814 */
6fd48b3b
AE
5815static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5816{
5817 struct rbd_image_header *header;
5818
e69b8d41 5819 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5820
5821 /* Free dynamic fields from the header, then zero it out */
5822
5823 header = &rbd_dev->header;
812164f8 5824 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5825 kfree(header->snap_sizes);
5826 kfree(header->snap_names);
5827 kfree(header->object_prefix);
5828 memset(header, 0, sizeof (*header));
5829}
5830
2df3fac7 5831static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5832{
5833 int ret;
a30b71b9 5834
1e130199 5835 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5836 if (ret)
b1b5402a
AE
5837 goto out_err;
5838
2df3fac7
AE
5839 /*
5840 * Get the and check features for the image. Currently the
5841 * features are assumed to never change.
5842 */
b1b5402a 5843 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5844 if (ret)
9d475de5 5845 goto out_err;
35d489f9 5846
cc070d59
AE
5847 /* If the image supports fancy striping, get its parameters */
5848
5849 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5850 ret = rbd_dev_v2_striping_info(rbd_dev);
5851 if (ret < 0)
5852 goto out_err;
5853 }
a30b71b9 5854
7e97332e
ID
5855 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5856 ret = rbd_dev_v2_data_pool(rbd_dev);
5857 if (ret)
5858 goto out_err;
5859 }
5860
263423f8 5861 rbd_init_layout(rbd_dev);
35152979 5862 return 0;
263423f8 5863
9d475de5 5864out_err:
642a2537 5865 rbd_dev->header.features = 0;
1e130199
AE
5866 kfree(rbd_dev->header.object_prefix);
5867 rbd_dev->header.object_prefix = NULL;
9d475de5 5868 return ret;
a30b71b9
AE
5869}
5870
6d69bb53
ID
5871/*
5872 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5873 * rbd_dev_image_probe() recursion depth, which means it's also the
5874 * length of the already discovered part of the parent chain.
5875 */
5876static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5877{
2f82ee54 5878 struct rbd_device *parent = NULL;
124afba2
AE
5879 int ret;
5880
5881 if (!rbd_dev->parent_spec)
5882 return 0;
124afba2 5883
6d69bb53
ID
5884 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5885 pr_info("parent chain is too long (%d)\n", depth);
5886 ret = -EINVAL;
5887 goto out_err;
5888 }
5889
1643dfa4 5890 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5891 if (!parent) {
5892 ret = -ENOMEM;
124afba2 5893 goto out_err;
1f2c6651
ID
5894 }
5895
5896 /*
5897 * Images related by parent/child relationships always share
5898 * rbd_client and spec/parent_spec, so bump their refcounts.
5899 */
5900 __rbd_get_client(rbd_dev->rbd_client);
5901 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5902
6d69bb53 5903 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5904 if (ret < 0)
5905 goto out_err;
1f2c6651 5906
124afba2 5907 rbd_dev->parent = parent;
a2acd00e 5908 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5909 return 0;
1f2c6651 5910
124afba2 5911out_err:
1f2c6651 5912 rbd_dev_unparent(rbd_dev);
1761b229 5913 rbd_dev_destroy(parent);
124afba2
AE
5914 return ret;
5915}
5916
811c6688
ID
5917/*
5918 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5919 * upon return.
5920 */
200a6a8b 5921static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5922{
83a06263 5923 int ret;
d1cf5788 5924
9b60e70b 5925 /* Record our major and minor device numbers. */
83a06263 5926
9b60e70b
ID
5927 if (!single_major) {
5928 ret = register_blkdev(0, rbd_dev->name);
5929 if (ret < 0)
1643dfa4 5930 goto err_out_unlock;
9b60e70b
ID
5931
5932 rbd_dev->major = ret;
5933 rbd_dev->minor = 0;
5934 } else {
5935 rbd_dev->major = rbd_major;
5936 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5937 }
83a06263
AE
5938
5939 /* Set up the blkdev mapping. */
5940
5941 ret = rbd_init_disk(rbd_dev);
5942 if (ret)
5943 goto err_out_blkdev;
5944
f35a4dee 5945 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
5946 if (ret)
5947 goto err_out_disk;
bc1ecc65 5948
f35a4dee 5949 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 5950 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 5951
dd5ac32d
ID
5952 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5953 ret = device_add(&rbd_dev->dev);
f35a4dee 5954 if (ret)
f5ee37bd 5955 goto err_out_mapping;
83a06263 5956
83a06263
AE
5957 /* Everything's ready. Announce the disk to the world. */
5958
129b79d4 5959 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 5960 up_write(&rbd_dev->header_rwsem);
83a06263 5961
1643dfa4
ID
5962 spin_lock(&rbd_dev_list_lock);
5963 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5964 spin_unlock(&rbd_dev_list_lock);
5965
811c6688 5966 add_disk(rbd_dev->disk);
ca7909e8
ID
5967 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5968 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5969 rbd_dev->header.features);
83a06263
AE
5970
5971 return ret;
2f82ee54 5972
f35a4dee
AE
5973err_out_mapping:
5974 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
5975err_out_disk:
5976 rbd_free_disk(rbd_dev);
5977err_out_blkdev:
9b60e70b
ID
5978 if (!single_major)
5979 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
5980err_out_unlock:
5981 up_write(&rbd_dev->header_rwsem);
83a06263
AE
5982 return ret;
5983}
5984
332bb12d
AE
5985static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5986{
5987 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 5988 int ret;
332bb12d
AE
5989
5990 /* Record the header object name for this rbd image. */
5991
5992 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 5993 if (rbd_dev->image_format == 1)
c41d13a3
ID
5994 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5995 spec->image_name, RBD_SUFFIX);
332bb12d 5996 else
c41d13a3
ID
5997 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5998 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 5999
c41d13a3 6000 return ret;
332bb12d
AE
6001}
6002
200a6a8b
AE
6003static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6004{
6fd48b3b 6005 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
6006 rbd_dev->image_format = 0;
6007 kfree(rbd_dev->spec->image_id);
6008 rbd_dev->spec->image_id = NULL;
6009
200a6a8b
AE
6010 rbd_dev_destroy(rbd_dev);
6011}
6012
a30b71b9
AE
6013/*
6014 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6015 * device. If this image is the one being mapped (i.e., not a
6016 * parent), initiate a watch on its header object before using that
6017 * object to get detailed information about the rbd image.
a30b71b9 6018 */
6d69bb53 6019static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6020{
6021 int ret;
6022
6023 /*
3abef3b3
AE
6024 * Get the id from the image id object. Unless there's an
6025 * error, rbd_dev->spec->image_id will be filled in with
6026 * a dynamically-allocated string, and rbd_dev->image_format
6027 * will be set to either 1 or 2.
a30b71b9
AE
6028 */
6029 ret = rbd_dev_image_id(rbd_dev);
6030 if (ret)
c0fba368 6031 return ret;
c0fba368 6032
332bb12d
AE
6033 ret = rbd_dev_header_name(rbd_dev);
6034 if (ret)
6035 goto err_out_format;
6036
6d69bb53 6037 if (!depth) {
99d16943 6038 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6039 if (ret) {
6040 if (ret == -ENOENT)
6041 pr_info("image %s/%s does not exist\n",
6042 rbd_dev->spec->pool_name,
6043 rbd_dev->spec->image_name);
c41d13a3 6044 goto err_out_format;
1fe48023 6045 }
1f3ef788 6046 }
b644de2b 6047
a720ae09 6048 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6049 if (ret)
b644de2b 6050 goto err_out_watch;
83a06263 6051
04077599
ID
6052 /*
6053 * If this image is the one being mapped, we have pool name and
6054 * id, image name and id, and snap name - need to fill snap id.
6055 * Otherwise this is a parent image, identified by pool, image
6056 * and snap ids - need to fill in names for those ids.
6057 */
6d69bb53 6058 if (!depth)
04077599
ID
6059 ret = rbd_spec_fill_snap_id(rbd_dev);
6060 else
6061 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6062 if (ret) {
6063 if (ret == -ENOENT)
6064 pr_info("snap %s/%s@%s does not exist\n",
6065 rbd_dev->spec->pool_name,
6066 rbd_dev->spec->image_name,
6067 rbd_dev->spec->snap_name);
33dca39f 6068 goto err_out_probe;
1fe48023 6069 }
9bb81c9b 6070
e8f59b59
ID
6071 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6072 ret = rbd_dev_v2_parent_info(rbd_dev);
6073 if (ret)
6074 goto err_out_probe;
6075
6076 /*
6077 * Need to warn users if this image is the one being
6078 * mapped and has a parent.
6079 */
6d69bb53 6080 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6081 rbd_warn(rbd_dev,
6082 "WARNING: kernel layering is EXPERIMENTAL!");
6083 }
6084
6d69bb53 6085 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6086 if (ret)
6087 goto err_out_probe;
6088
6089 dout("discovered format %u image, header name is %s\n",
c41d13a3 6090 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6091 return 0;
e8f59b59 6092
6fd48b3b
AE
6093err_out_probe:
6094 rbd_dev_unprobe(rbd_dev);
b644de2b 6095err_out_watch:
6d69bb53 6096 if (!depth)
99d16943 6097 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6098err_out_format:
6099 rbd_dev->image_format = 0;
5655c4d9
AE
6100 kfree(rbd_dev->spec->image_id);
6101 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6102 return ret;
6103}
6104
9b60e70b
ID
6105static ssize_t do_rbd_add(struct bus_type *bus,
6106 const char *buf,
6107 size_t count)
602adf40 6108{
cb8627c7 6109 struct rbd_device *rbd_dev = NULL;
dc79b113 6110 struct ceph_options *ceph_opts = NULL;
4e9afeba 6111 struct rbd_options *rbd_opts = NULL;
859c31df 6112 struct rbd_spec *spec = NULL;
9d3997fd 6113 struct rbd_client *rbdc;
51344a38 6114 bool read_only;
b51c83c2 6115 int rc;
602adf40
YS
6116
6117 if (!try_module_get(THIS_MODULE))
6118 return -ENODEV;
6119
602adf40 6120 /* parse add command */
859c31df 6121 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6122 if (rc < 0)
dd5ac32d 6123 goto out;
78cea76e 6124
9d3997fd
AE
6125 rbdc = rbd_get_client(ceph_opts);
6126 if (IS_ERR(rbdc)) {
6127 rc = PTR_ERR(rbdc);
0ddebc0c 6128 goto err_out_args;
9d3997fd 6129 }
602adf40 6130
602adf40 6131 /* pick the pool */
30ba1f02 6132 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6133 if (rc < 0) {
6134 if (rc == -ENOENT)
6135 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6136 goto err_out_client;
1fe48023 6137 }
c0cd10db 6138 spec->pool_id = (u64)rc;
859c31df 6139
d147543d 6140 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6141 if (!rbd_dev) {
6142 rc = -ENOMEM;
bd4ba655 6143 goto err_out_client;
b51c83c2 6144 }
c53d5893
AE
6145 rbdc = NULL; /* rbd_dev now owns this */
6146 spec = NULL; /* rbd_dev now owns this */
d147543d 6147 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6148
0d6d1e9c
MC
6149 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6150 if (!rbd_dev->config_info) {
6151 rc = -ENOMEM;
6152 goto err_out_rbd_dev;
6153 }
6154
811c6688 6155 down_write(&rbd_dev->header_rwsem);
6d69bb53 6156 rc = rbd_dev_image_probe(rbd_dev, 0);
0d6d1e9c
MC
6157 if (rc < 0) {
6158 up_write(&rbd_dev->header_rwsem);
c53d5893 6159 goto err_out_rbd_dev;
0d6d1e9c 6160 }
05fd6f6f 6161
7ce4eef7
AE
6162 /* If we are mapping a snapshot it must be marked read-only */
6163
d147543d 6164 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6165 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6166 read_only = true;
6167 rbd_dev->mapping.read_only = read_only;
6168
b536f69a 6169 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 6170 if (rc) {
e37180c0 6171 /*
99d16943 6172 * rbd_unregister_watch() can't be moved into
e37180c0
ID
6173 * rbd_dev_image_release() without refactoring, see
6174 * commit 1f3ef78861ac.
6175 */
99d16943 6176 rbd_unregister_watch(rbd_dev);
3abef3b3 6177 rbd_dev_image_release(rbd_dev);
dd5ac32d 6178 goto out;
3abef3b3
AE
6179 }
6180
dd5ac32d
ID
6181 rc = count;
6182out:
6183 module_put(THIS_MODULE);
6184 return rc;
b536f69a 6185
c53d5893
AE
6186err_out_rbd_dev:
6187 rbd_dev_destroy(rbd_dev);
bd4ba655 6188err_out_client:
9d3997fd 6189 rbd_put_client(rbdc);
0ddebc0c 6190err_out_args:
859c31df 6191 rbd_spec_put(spec);
d147543d 6192 kfree(rbd_opts);
dd5ac32d 6193 goto out;
602adf40
YS
6194}
6195
9b60e70b
ID
6196static ssize_t rbd_add(struct bus_type *bus,
6197 const char *buf,
6198 size_t count)
6199{
6200 if (single_major)
6201 return -EINVAL;
6202
6203 return do_rbd_add(bus, buf, count);
6204}
6205
6206static ssize_t rbd_add_single_major(struct bus_type *bus,
6207 const char *buf,
6208 size_t count)
6209{
6210 return do_rbd_add(bus, buf, count);
6211}
6212
dd5ac32d 6213static void rbd_dev_device_release(struct rbd_device *rbd_dev)
602adf40 6214{
602adf40 6215 rbd_free_disk(rbd_dev);
1643dfa4
ID
6216
6217 spin_lock(&rbd_dev_list_lock);
6218 list_del_init(&rbd_dev->node);
6219 spin_unlock(&rbd_dev_list_lock);
6220
200a6a8b 6221 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
dd5ac32d 6222 device_del(&rbd_dev->dev);
6d80b130 6223 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
6224 if (!single_major)
6225 unregister_blkdev(rbd_dev->major, rbd_dev->name);
602adf40
YS
6226}
6227
05a46afd
AE
6228static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6229{
ad945fc1 6230 while (rbd_dev->parent) {
05a46afd
AE
6231 struct rbd_device *first = rbd_dev;
6232 struct rbd_device *second = first->parent;
6233 struct rbd_device *third;
6234
6235 /*
6236 * Follow to the parent with no grandparent and
6237 * remove it.
6238 */
6239 while (second && (third = second->parent)) {
6240 first = second;
6241 second = third;
6242 }
ad945fc1 6243 rbd_assert(second);
8ad42cd0 6244 rbd_dev_image_release(second);
ad945fc1
AE
6245 first->parent = NULL;
6246 first->parent_overlap = 0;
6247
6248 rbd_assert(first->parent_spec);
05a46afd
AE
6249 rbd_spec_put(first->parent_spec);
6250 first->parent_spec = NULL;
05a46afd
AE
6251 }
6252}
6253
9b60e70b
ID
6254static ssize_t do_rbd_remove(struct bus_type *bus,
6255 const char *buf,
6256 size_t count)
602adf40
YS
6257{
6258 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6259 struct list_head *tmp;
6260 int dev_id;
0276dca6 6261 char opt_buf[6];
82a442d2 6262 bool already = false;
0276dca6 6263 bool force = false;
0d8189e1 6264 int ret;
602adf40 6265
0276dca6
MC
6266 dev_id = -1;
6267 opt_buf[0] = '\0';
6268 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6269 if (dev_id < 0) {
6270 pr_err("dev_id out of range\n");
602adf40 6271 return -EINVAL;
0276dca6
MC
6272 }
6273 if (opt_buf[0] != '\0') {
6274 if (!strcmp(opt_buf, "force")) {
6275 force = true;
6276 } else {
6277 pr_err("bad remove option at '%s'\n", opt_buf);
6278 return -EINVAL;
6279 }
6280 }
602adf40 6281
751cc0e3
AE
6282 ret = -ENOENT;
6283 spin_lock(&rbd_dev_list_lock);
6284 list_for_each(tmp, &rbd_dev_list) {
6285 rbd_dev = list_entry(tmp, struct rbd_device, node);
6286 if (rbd_dev->dev_id == dev_id) {
6287 ret = 0;
6288 break;
6289 }
42382b70 6290 }
751cc0e3
AE
6291 if (!ret) {
6292 spin_lock_irq(&rbd_dev->lock);
0276dca6 6293 if (rbd_dev->open_count && !force)
751cc0e3
AE
6294 ret = -EBUSY;
6295 else
82a442d2
AE
6296 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6297 &rbd_dev->flags);
751cc0e3
AE
6298 spin_unlock_irq(&rbd_dev->lock);
6299 }
6300 spin_unlock(&rbd_dev_list_lock);
82a442d2 6301 if (ret < 0 || already)
1ba0f1e7 6302 return ret;
751cc0e3 6303
0276dca6
MC
6304 if (force) {
6305 /*
6306 * Prevent new IO from being queued and wait for existing
6307 * IO to complete/fail.
6308 */
6309 blk_mq_freeze_queue(rbd_dev->disk->queue);
6310 blk_set_queue_dying(rbd_dev->disk->queue);
6311 }
6312
ed95b21a
ID
6313 down_write(&rbd_dev->lock_rwsem);
6314 if (__rbd_is_lock_owner(rbd_dev))
6315 rbd_unlock(rbd_dev);
6316 up_write(&rbd_dev->lock_rwsem);
99d16943 6317 rbd_unregister_watch(rbd_dev);
fca27065 6318
9875201e
JD
6319 /*
6320 * Don't free anything from rbd_dev->disk until after all
6321 * notifies are completely processed. Otherwise
6322 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6323 * in a potential use after free of rbd_dev->disk or rbd_dev.
6324 */
dd5ac32d 6325 rbd_dev_device_release(rbd_dev);
8ad42cd0 6326 rbd_dev_image_release(rbd_dev);
aafb230e 6327
1ba0f1e7 6328 return count;
602adf40
YS
6329}
6330
9b60e70b
ID
6331static ssize_t rbd_remove(struct bus_type *bus,
6332 const char *buf,
6333 size_t count)
6334{
6335 if (single_major)
6336 return -EINVAL;
6337
6338 return do_rbd_remove(bus, buf, count);
6339}
6340
6341static ssize_t rbd_remove_single_major(struct bus_type *bus,
6342 const char *buf,
6343 size_t count)
6344{
6345 return do_rbd_remove(bus, buf, count);
6346}
6347
602adf40
YS
6348/*
6349 * create control files in sysfs
dfc5606d 6350 * /sys/bus/rbd/...
602adf40
YS
6351 */
6352static int rbd_sysfs_init(void)
6353{
dfc5606d 6354 int ret;
602adf40 6355
fed4c143 6356 ret = device_register(&rbd_root_dev);
21079786 6357 if (ret < 0)
dfc5606d 6358 return ret;
602adf40 6359
fed4c143
AE
6360 ret = bus_register(&rbd_bus_type);
6361 if (ret < 0)
6362 device_unregister(&rbd_root_dev);
602adf40 6363
602adf40
YS
6364 return ret;
6365}
6366
6367static void rbd_sysfs_cleanup(void)
6368{
dfc5606d 6369 bus_unregister(&rbd_bus_type);
fed4c143 6370 device_unregister(&rbd_root_dev);
602adf40
YS
6371}
6372
1c2a9dfe
AE
6373static int rbd_slab_init(void)
6374{
6375 rbd_assert(!rbd_img_request_cache);
03d94406 6376 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6377 if (!rbd_img_request_cache)
6378 return -ENOMEM;
6379
6380 rbd_assert(!rbd_obj_request_cache);
03d94406 6381 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6382 if (!rbd_obj_request_cache)
6383 goto out_err;
6384
6385 rbd_assert(!rbd_segment_name_cache);
6386 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 6387 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 6388 if (rbd_segment_name_cache)
1c2a9dfe 6389 return 0;
78c2a44a 6390out_err:
13bf2834
JL
6391 kmem_cache_destroy(rbd_obj_request_cache);
6392 rbd_obj_request_cache = NULL;
1c2a9dfe 6393
868311b1
AE
6394 kmem_cache_destroy(rbd_img_request_cache);
6395 rbd_img_request_cache = NULL;
6396
1c2a9dfe
AE
6397 return -ENOMEM;
6398}
6399
6400static void rbd_slab_exit(void)
6401{
78c2a44a
AE
6402 rbd_assert(rbd_segment_name_cache);
6403 kmem_cache_destroy(rbd_segment_name_cache);
6404 rbd_segment_name_cache = NULL;
6405
868311b1
AE
6406 rbd_assert(rbd_obj_request_cache);
6407 kmem_cache_destroy(rbd_obj_request_cache);
6408 rbd_obj_request_cache = NULL;
6409
1c2a9dfe
AE
6410 rbd_assert(rbd_img_request_cache);
6411 kmem_cache_destroy(rbd_img_request_cache);
6412 rbd_img_request_cache = NULL;
6413}
6414
cc344fa1 6415static int __init rbd_init(void)
602adf40
YS
6416{
6417 int rc;
6418
1e32d34c
AE
6419 if (!libceph_compatible(NULL)) {
6420 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6421 return -EINVAL;
6422 }
e1b4d96d 6423
1c2a9dfe 6424 rc = rbd_slab_init();
602adf40
YS
6425 if (rc)
6426 return rc;
e1b4d96d 6427
f5ee37bd
ID
6428 /*
6429 * The number of active work items is limited by the number of
f77303bd 6430 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6431 */
6432 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6433 if (!rbd_wq) {
6434 rc = -ENOMEM;
6435 goto err_out_slab;
6436 }
6437
9b60e70b
ID
6438 if (single_major) {
6439 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6440 if (rbd_major < 0) {
6441 rc = rbd_major;
f5ee37bd 6442 goto err_out_wq;
9b60e70b
ID
6443 }
6444 }
6445
1c2a9dfe
AE
6446 rc = rbd_sysfs_init();
6447 if (rc)
9b60e70b
ID
6448 goto err_out_blkdev;
6449
6450 if (single_major)
6451 pr_info("loaded (major %d)\n", rbd_major);
6452 else
6453 pr_info("loaded\n");
1c2a9dfe 6454
e1b4d96d
ID
6455 return 0;
6456
9b60e70b
ID
6457err_out_blkdev:
6458 if (single_major)
6459 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6460err_out_wq:
6461 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6462err_out_slab:
6463 rbd_slab_exit();
1c2a9dfe 6464 return rc;
602adf40
YS
6465}
6466
cc344fa1 6467static void __exit rbd_exit(void)
602adf40 6468{
ffe312cf 6469 ida_destroy(&rbd_dev_id_ida);
602adf40 6470 rbd_sysfs_cleanup();
9b60e70b
ID
6471 if (single_major)
6472 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6473 destroy_workqueue(rbd_wq);
1c2a9dfe 6474 rbd_slab_exit();
602adf40
YS
6475}
6476
6477module_init(rbd_init);
6478module_exit(rbd_exit);
6479
d552c619 6480MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6481MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6482MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6483/* following authorship retained from original osdblk.c */
6484MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6485
90da258b 6486MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6487MODULE_LICENSE("GPL");