]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/rbd.c
rbd: add 'snap_id' sysfs rbd device attribute
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
593a9e7b
AE
53/*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59#define SECTOR_SHIFT 9
60#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
a2acd00e
AE
62/*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68static int atomic_inc_return_safe(atomic_t *v)
69{
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79}
80
81/* Decrement the counter. Return the resulting value, or -EINVAL */
82static int atomic_dec_return_safe(atomic_t *v)
83{
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93}
94
f0f8cef5 95#define RBD_DRV_NAME "rbd"
602adf40 96
7e513d43
ID
97#define RBD_MINORS_PER_MAJOR 256
98#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 99
6d69bb53
ID
100#define RBD_MAX_PARENT_CHAIN_LEN 16
101
d4b125e9
AE
102#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103#define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
35d489f9 106#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
107
108#define RBD_SNAP_HEAD_NAME "-"
109
9682fc6d
AE
110#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
9e15b77d
AE
112/* This allows a single page to hold an image name sent by OSD */
113#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 114#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 115
1e130199 116#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 117
ed95b21a 118#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
119#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
d889140c
AE
121/* Feature bits */
122
5cbf6f12
AE
123#define RBD_FEATURE_LAYERING (1<<0)
124#define RBD_FEATURE_STRIPINGV2 (1<<1)
ed95b21a
ID
125#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
126#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
127 RBD_FEATURE_STRIPINGV2 | \
128 RBD_FEATURE_EXCLUSIVE_LOCK)
d889140c
AE
129
130/* Features supported by this (client software) implementation. */
131
770eba6e 132#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 133
81a89793
AE
134/*
135 * An RBD device name will be "rbd#", where the "rbd" comes from
136 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 137 */
602adf40
YS
138#define DEV_NAME_LEN 32
139
140/*
141 * block device image metadata (in-memory version)
142 */
143struct rbd_image_header {
f35a4dee 144 /* These six fields never change for a given rbd image */
849b4260 145 char *object_prefix;
602adf40
YS
146 __u8 obj_order;
147 __u8 crypt_type;
148 __u8 comp_type;
f35a4dee
AE
149 u64 stripe_unit;
150 u64 stripe_count;
151 u64 features; /* Might be changeable someday? */
602adf40 152
f84344f3
AE
153 /* The remaining fields need to be updated occasionally */
154 u64 image_size;
155 struct ceph_snap_context *snapc;
f35a4dee
AE
156 char *snap_names; /* format 1 only */
157 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
158};
159
0d7dbfce
AE
160/*
161 * An rbd image specification.
162 *
163 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
164 * identify an image. Each rbd_dev structure includes a pointer to
165 * an rbd_spec structure that encapsulates this identity.
166 *
167 * Each of the id's in an rbd_spec has an associated name. For a
168 * user-mapped image, the names are supplied and the id's associated
169 * with them are looked up. For a layered image, a parent image is
170 * defined by the tuple, and the names are looked up.
171 *
172 * An rbd_dev structure contains a parent_spec pointer which is
173 * non-null if the image it represents is a child in a layered
174 * image. This pointer will refer to the rbd_spec structure used
175 * by the parent rbd_dev for its own identity (i.e., the structure
176 * is shared between the parent and child).
177 *
178 * Since these structures are populated once, during the discovery
179 * phase of image construction, they are effectively immutable so
180 * we make no effort to synchronize access to them.
181 *
182 * Note that code herein does not assume the image name is known (it
183 * could be a null pointer).
0d7dbfce
AE
184 */
185struct rbd_spec {
186 u64 pool_id;
ecb4dc22 187 const char *pool_name;
0d7dbfce 188
ecb4dc22
AE
189 const char *image_id;
190 const char *image_name;
0d7dbfce
AE
191
192 u64 snap_id;
ecb4dc22 193 const char *snap_name;
0d7dbfce
AE
194
195 struct kref kref;
196};
197
602adf40 198/*
f0f8cef5 199 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
200 */
201struct rbd_client {
202 struct ceph_client *client;
203 struct kref kref;
204 struct list_head node;
205};
206
bf0d5f50
AE
207struct rbd_img_request;
208typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
209
210#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
211
212struct rbd_obj_request;
213typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
214
9969ebc5
AE
215enum obj_request_type {
216 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
217};
bf0d5f50 218
6d2940c8
GZ
219enum obj_operation_type {
220 OBJ_OP_WRITE,
221 OBJ_OP_READ,
90e98c52 222 OBJ_OP_DISCARD,
6d2940c8
GZ
223};
224
926f9b3f
AE
225enum obj_req_flags {
226 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 227 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
228 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
229 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
230};
231
bf0d5f50
AE
232struct rbd_obj_request {
233 const char *object_name;
234 u64 offset; /* object start byte */
235 u64 length; /* bytes from offset */
926f9b3f 236 unsigned long flags;
bf0d5f50 237
c5b5ef6c
AE
238 /*
239 * An object request associated with an image will have its
240 * img_data flag set; a standalone object request will not.
241 *
242 * A standalone object request will have which == BAD_WHICH
243 * and a null obj_request pointer.
244 *
245 * An object request initiated in support of a layered image
246 * object (to check for its existence before a write) will
247 * have which == BAD_WHICH and a non-null obj_request pointer.
248 *
249 * Finally, an object request for rbd image data will have
250 * which != BAD_WHICH, and will have a non-null img_request
251 * pointer. The value of which will be in the range
252 * 0..(img_request->obj_request_count-1).
253 */
254 union {
255 struct rbd_obj_request *obj_request; /* STAT op */
256 struct {
257 struct rbd_img_request *img_request;
258 u64 img_offset;
259 /* links for img_request->obj_requests list */
260 struct list_head links;
261 };
262 };
bf0d5f50
AE
263 u32 which; /* posn image request list */
264
265 enum obj_request_type type;
788e2df3
AE
266 union {
267 struct bio *bio_list;
268 struct {
269 struct page **pages;
270 u32 page_count;
271 };
272 };
0eefd470 273 struct page **copyup_pages;
ebda6408 274 u32 copyup_page_count;
bf0d5f50
AE
275
276 struct ceph_osd_request *osd_req;
277
278 u64 xferred; /* bytes transferred */
1b83bef2 279 int result;
bf0d5f50
AE
280
281 rbd_obj_callback_t callback;
788e2df3 282 struct completion completion;
bf0d5f50
AE
283
284 struct kref kref;
285};
286
0c425248 287enum img_req_flags {
9849e986
AE
288 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
289 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 290 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 291 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
292};
293
bf0d5f50 294struct rbd_img_request {
bf0d5f50
AE
295 struct rbd_device *rbd_dev;
296 u64 offset; /* starting image byte offset */
297 u64 length; /* byte count from offset */
0c425248 298 unsigned long flags;
bf0d5f50 299 union {
9849e986 300 u64 snap_id; /* for reads */
bf0d5f50 301 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
302 };
303 union {
304 struct request *rq; /* block request */
305 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 306 };
3d7efd18 307 struct page **copyup_pages;
ebda6408 308 u32 copyup_page_count;
bf0d5f50
AE
309 spinlock_t completion_lock;/* protects next_completion */
310 u32 next_completion;
311 rbd_img_callback_t callback;
55f27e09 312 u64 xferred;/* aggregate bytes transferred */
a5a337d4 313 int result; /* first nonzero obj_request result */
bf0d5f50
AE
314
315 u32 obj_request_count;
316 struct list_head obj_requests; /* rbd_obj_request structs */
317
318 struct kref kref;
319};
320
321#define for_each_obj_request(ireq, oreq) \
ef06f4d3 322 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 323#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 324 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 325#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 326 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 327
99d16943
ID
328enum rbd_watch_state {
329 RBD_WATCH_STATE_UNREGISTERED,
330 RBD_WATCH_STATE_REGISTERED,
331 RBD_WATCH_STATE_ERROR,
332};
333
ed95b21a
ID
334enum rbd_lock_state {
335 RBD_LOCK_STATE_UNLOCKED,
336 RBD_LOCK_STATE_LOCKED,
337 RBD_LOCK_STATE_RELEASING,
338};
339
340/* WatchNotify::ClientId */
341struct rbd_client_id {
342 u64 gid;
343 u64 handle;
344};
345
f84344f3 346struct rbd_mapping {
99c1f08f 347 u64 size;
34b13184 348 u64 features;
f84344f3
AE
349 bool read_only;
350};
351
602adf40
YS
352/*
353 * a single device
354 */
355struct rbd_device {
de71a297 356 int dev_id; /* blkdev unique id */
602adf40
YS
357
358 int major; /* blkdev assigned major */
dd82fff1 359 int minor;
602adf40 360 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 361
a30b71b9 362 u32 image_format; /* Either 1 or 2 */
602adf40
YS
363 struct rbd_client *rbd_client;
364
365 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
366
b82d167b 367 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
368
369 struct rbd_image_header header;
b82d167b 370 unsigned long flags; /* possibly lock protected */
0d7dbfce 371 struct rbd_spec *spec;
d147543d 372 struct rbd_options *opts;
602adf40 373
c41d13a3 374 struct ceph_object_id header_oid;
922dab61 375 struct ceph_object_locator header_oloc;
971f839a 376
1643dfa4 377 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 378
99d16943
ID
379 struct mutex watch_mutex;
380 enum rbd_watch_state watch_state;
922dab61 381 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
382 u64 watch_cookie;
383 struct delayed_work watch_dwork;
59c2be1e 384
ed95b21a
ID
385 struct rw_semaphore lock_rwsem;
386 enum rbd_lock_state lock_state;
387 struct rbd_client_id owner_cid;
388 struct work_struct acquired_lock_work;
389 struct work_struct released_lock_work;
390 struct delayed_work lock_dwork;
391 struct work_struct unlock_work;
392 wait_queue_head_t lock_waitq;
393
1643dfa4
ID
394 struct workqueue_struct *task_wq;
395
86b00e0d
AE
396 struct rbd_spec *parent_spec;
397 u64 parent_overlap;
a2acd00e 398 atomic_t parent_ref;
2f82ee54 399 struct rbd_device *parent;
86b00e0d 400
7ad18afa
CH
401 /* Block layer tags. */
402 struct blk_mq_tag_set tag_set;
403
c666601a
JD
404 /* protects updating the header */
405 struct rw_semaphore header_rwsem;
f84344f3
AE
406
407 struct rbd_mapping mapping;
602adf40
YS
408
409 struct list_head node;
dfc5606d 410
dfc5606d
YS
411 /* sysfs related */
412 struct device dev;
b82d167b 413 unsigned long open_count; /* protected by lock */
dfc5606d
YS
414};
415
b82d167b
AE
416/*
417 * Flag bits for rbd_dev->flags. If atomicity is required,
418 * rbd_dev->lock is used to protect access.
419 *
420 * Currently, only the "removing" flag (which is coupled with the
421 * "open_count" field) requires atomic access.
422 */
6d292906
AE
423enum rbd_dev_flags {
424 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 425 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
426};
427
cfbf6377 428static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 429
602adf40 430static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
431static DEFINE_SPINLOCK(rbd_dev_list_lock);
432
432b8587
AE
433static LIST_HEAD(rbd_client_list); /* clients */
434static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 435
78c2a44a
AE
436/* Slab caches for frequently-allocated structures */
437
1c2a9dfe 438static struct kmem_cache *rbd_img_request_cache;
868311b1 439static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 440static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 441
9b60e70b 442static int rbd_major;
f8a22fc2
ID
443static DEFINE_IDA(rbd_dev_id_ida);
444
f5ee37bd
ID
445static struct workqueue_struct *rbd_wq;
446
9b60e70b
ID
447/*
448 * Default to false for now, as single-major requires >= 0.75 version of
449 * userspace rbd utility.
450 */
451static bool single_major = false;
452module_param(single_major, bool, S_IRUGO);
453MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
454
3d7efd18
AE
455static int rbd_img_request_submit(struct rbd_img_request *img_request);
456
f0f8cef5
AE
457static ssize_t rbd_add(struct bus_type *bus, const char *buf,
458 size_t count);
459static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
460 size_t count);
9b60e70b
ID
461static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
462 size_t count);
463static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
464 size_t count);
6d69bb53 465static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 466static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 467
9b60e70b
ID
468static int rbd_dev_id_to_minor(int dev_id)
469{
7e513d43 470 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
471}
472
473static int minor_to_rbd_dev_id(int minor)
474{
7e513d43 475 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
476}
477
ed95b21a
ID
478static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
479{
480 return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
481 rbd_dev->spec->snap_id == CEPH_NOSNAP &&
482 !rbd_dev->mapping.read_only;
483}
484
485static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
486{
487 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
488 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
489}
490
491static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
492{
493 bool is_lock_owner;
494
495 down_read(&rbd_dev->lock_rwsem);
496 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
497 up_read(&rbd_dev->lock_rwsem);
498 return is_lock_owner;
499}
500
b15a21dd
GKH
501static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
502static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
503static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
504static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
b15a21dd
GKH
505
506static struct attribute *rbd_bus_attrs[] = {
507 &bus_attr_add.attr,
508 &bus_attr_remove.attr,
9b60e70b
ID
509 &bus_attr_add_single_major.attr,
510 &bus_attr_remove_single_major.attr,
b15a21dd 511 NULL,
f0f8cef5 512};
92c76dc0
ID
513
514static umode_t rbd_bus_is_visible(struct kobject *kobj,
515 struct attribute *attr, int index)
516{
9b60e70b
ID
517 if (!single_major &&
518 (attr == &bus_attr_add_single_major.attr ||
519 attr == &bus_attr_remove_single_major.attr))
520 return 0;
521
92c76dc0
ID
522 return attr->mode;
523}
524
525static const struct attribute_group rbd_bus_group = {
526 .attrs = rbd_bus_attrs,
527 .is_visible = rbd_bus_is_visible,
528};
529__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
530
531static struct bus_type rbd_bus_type = {
532 .name = "rbd",
b15a21dd 533 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
534};
535
536static void rbd_root_dev_release(struct device *dev)
537{
538}
539
540static struct device rbd_root_dev = {
541 .init_name = "rbd",
542 .release = rbd_root_dev_release,
543};
544
06ecc6cb
AE
545static __printf(2, 3)
546void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
547{
548 struct va_format vaf;
549 va_list args;
550
551 va_start(args, fmt);
552 vaf.fmt = fmt;
553 vaf.va = &args;
554
555 if (!rbd_dev)
556 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
557 else if (rbd_dev->disk)
558 printk(KERN_WARNING "%s: %s: %pV\n",
559 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
560 else if (rbd_dev->spec && rbd_dev->spec->image_name)
561 printk(KERN_WARNING "%s: image %s: %pV\n",
562 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
563 else if (rbd_dev->spec && rbd_dev->spec->image_id)
564 printk(KERN_WARNING "%s: id %s: %pV\n",
565 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
566 else /* punt */
567 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
568 RBD_DRV_NAME, rbd_dev, &vaf);
569 va_end(args);
570}
571
aafb230e
AE
572#ifdef RBD_DEBUG
573#define rbd_assert(expr) \
574 if (unlikely(!(expr))) { \
575 printk(KERN_ERR "\nAssertion failure in %s() " \
576 "at line %d:\n\n" \
577 "\trbd_assert(%s);\n\n", \
578 __func__, __LINE__, #expr); \
579 BUG(); \
580 }
581#else /* !RBD_DEBUG */
582# define rbd_assert(expr) ((void) 0)
583#endif /* !RBD_DEBUG */
dfc5606d 584
2761713d 585static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 586static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
587static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
588static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 589
cc4a38bd 590static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 591static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 592static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 593static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
594static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
595 u64 snap_id);
2ad3d716
AE
596static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
597 u8 *order, u64 *snap_size);
598static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
599 u64 *snap_features);
59c2be1e 600
602adf40
YS
601static int rbd_open(struct block_device *bdev, fmode_t mode)
602{
f0f8cef5 603 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 604 bool removing = false;
602adf40 605
f84344f3 606 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
607 return -EROFS;
608
a14ea269 609 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
610 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
611 removing = true;
612 else
613 rbd_dev->open_count++;
a14ea269 614 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
615 if (removing)
616 return -ENOENT;
617
c3e946ce 618 (void) get_device(&rbd_dev->dev);
340c7a2b 619
602adf40
YS
620 return 0;
621}
622
db2a144b 623static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
624{
625 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
626 unsigned long open_count_before;
627
a14ea269 628 spin_lock_irq(&rbd_dev->lock);
b82d167b 629 open_count_before = rbd_dev->open_count--;
a14ea269 630 spin_unlock_irq(&rbd_dev->lock);
b82d167b 631 rbd_assert(open_count_before > 0);
dfc5606d 632
c3e946ce 633 put_device(&rbd_dev->dev);
dfc5606d
YS
634}
635
131fd9f6
GZ
636static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
637{
77f33c03 638 int ret = 0;
131fd9f6
GZ
639 int val;
640 bool ro;
77f33c03 641 bool ro_changed = false;
131fd9f6 642
77f33c03 643 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
644 if (get_user(val, (int __user *)(arg)))
645 return -EFAULT;
646
647 ro = val ? true : false;
648 /* Snapshot doesn't allow to write*/
649 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
650 return -EROFS;
651
77f33c03
JD
652 spin_lock_irq(&rbd_dev->lock);
653 /* prevent others open this device */
654 if (rbd_dev->open_count > 1) {
655 ret = -EBUSY;
656 goto out;
657 }
658
131fd9f6
GZ
659 if (rbd_dev->mapping.read_only != ro) {
660 rbd_dev->mapping.read_only = ro;
77f33c03 661 ro_changed = true;
131fd9f6
GZ
662 }
663
77f33c03
JD
664out:
665 spin_unlock_irq(&rbd_dev->lock);
666 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
667 if (ret == 0 && ro_changed)
668 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
669
670 return ret;
131fd9f6
GZ
671}
672
673static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
674 unsigned int cmd, unsigned long arg)
675{
676 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
677 int ret = 0;
678
131fd9f6
GZ
679 switch (cmd) {
680 case BLKROSET:
681 ret = rbd_ioctl_set_ro(rbd_dev, arg);
682 break;
683 default:
684 ret = -ENOTTY;
685 }
686
131fd9f6
GZ
687 return ret;
688}
689
690#ifdef CONFIG_COMPAT
691static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
692 unsigned int cmd, unsigned long arg)
693{
694 return rbd_ioctl(bdev, mode, cmd, arg);
695}
696#endif /* CONFIG_COMPAT */
697
602adf40
YS
698static const struct block_device_operations rbd_bd_ops = {
699 .owner = THIS_MODULE,
700 .open = rbd_open,
dfc5606d 701 .release = rbd_release,
131fd9f6
GZ
702 .ioctl = rbd_ioctl,
703#ifdef CONFIG_COMPAT
704 .compat_ioctl = rbd_compat_ioctl,
705#endif
602adf40
YS
706};
707
708/*
7262cfca 709 * Initialize an rbd client instance. Success or not, this function
cfbf6377 710 * consumes ceph_opts. Caller holds client_mutex.
602adf40 711 */
f8c38929 712static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
713{
714 struct rbd_client *rbdc;
715 int ret = -ENOMEM;
716
37206ee5 717 dout("%s:\n", __func__);
602adf40
YS
718 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
719 if (!rbdc)
720 goto out_opt;
721
722 kref_init(&rbdc->kref);
723 INIT_LIST_HEAD(&rbdc->node);
724
43ae4701 725 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 726 if (IS_ERR(rbdc->client))
08f75463 727 goto out_rbdc;
43ae4701 728 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
729
730 ret = ceph_open_session(rbdc->client);
731 if (ret < 0)
08f75463 732 goto out_client;
602adf40 733
432b8587 734 spin_lock(&rbd_client_list_lock);
602adf40 735 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 736 spin_unlock(&rbd_client_list_lock);
602adf40 737
37206ee5 738 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 739
602adf40 740 return rbdc;
08f75463 741out_client:
602adf40 742 ceph_destroy_client(rbdc->client);
08f75463 743out_rbdc:
602adf40
YS
744 kfree(rbdc);
745out_opt:
43ae4701
AE
746 if (ceph_opts)
747 ceph_destroy_options(ceph_opts);
37206ee5
AE
748 dout("%s: error %d\n", __func__, ret);
749
28f259b7 750 return ERR_PTR(ret);
602adf40
YS
751}
752
2f82ee54
AE
753static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
754{
755 kref_get(&rbdc->kref);
756
757 return rbdc;
758}
759
602adf40 760/*
1f7ba331
AE
761 * Find a ceph client with specific addr and configuration. If
762 * found, bump its reference count.
602adf40 763 */
1f7ba331 764static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
765{
766 struct rbd_client *client_node;
1f7ba331 767 bool found = false;
602adf40 768
43ae4701 769 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
770 return NULL;
771
1f7ba331
AE
772 spin_lock(&rbd_client_list_lock);
773 list_for_each_entry(client_node, &rbd_client_list, node) {
774 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
775 __rbd_get_client(client_node);
776
1f7ba331
AE
777 found = true;
778 break;
779 }
780 }
781 spin_unlock(&rbd_client_list_lock);
782
783 return found ? client_node : NULL;
602adf40
YS
784}
785
59c2be1e 786/*
210c104c 787 * (Per device) rbd map options
59c2be1e
YS
788 */
789enum {
b5584180 790 Opt_queue_depth,
59c2be1e
YS
791 Opt_last_int,
792 /* int args above */
793 Opt_last_string,
794 /* string args above */
cc0538b6
AE
795 Opt_read_only,
796 Opt_read_write,
210c104c 797 Opt_err
59c2be1e
YS
798};
799
43ae4701 800static match_table_t rbd_opts_tokens = {
b5584180 801 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
802 /* int args above */
803 /* string args above */
be466c1c 804 {Opt_read_only, "read_only"},
cc0538b6
AE
805 {Opt_read_only, "ro"}, /* Alternate spelling */
806 {Opt_read_write, "read_write"},
807 {Opt_read_write, "rw"}, /* Alternate spelling */
210c104c 808 {Opt_err, NULL}
59c2be1e
YS
809};
810
98571b5a 811struct rbd_options {
b5584180 812 int queue_depth;
98571b5a
AE
813 bool read_only;
814};
815
b5584180 816#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a
AE
817#define RBD_READ_ONLY_DEFAULT false
818
59c2be1e
YS
819static int parse_rbd_opts_token(char *c, void *private)
820{
43ae4701 821 struct rbd_options *rbd_opts = private;
59c2be1e
YS
822 substring_t argstr[MAX_OPT_ARGS];
823 int token, intval, ret;
824
43ae4701 825 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
826 if (token < Opt_last_int) {
827 ret = match_int(&argstr[0], &intval);
828 if (ret < 0) {
210c104c 829 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
830 return ret;
831 }
832 dout("got int token %d val %d\n", token, intval);
833 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 834 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
835 } else {
836 dout("got token %d\n", token);
837 }
838
839 switch (token) {
b5584180
ID
840 case Opt_queue_depth:
841 if (intval < 1) {
842 pr_err("queue_depth out of range\n");
843 return -EINVAL;
844 }
845 rbd_opts->queue_depth = intval;
846 break;
cc0538b6
AE
847 case Opt_read_only:
848 rbd_opts->read_only = true;
849 break;
850 case Opt_read_write:
851 rbd_opts->read_only = false;
852 break;
59c2be1e 853 default:
210c104c
ID
854 /* libceph prints "bad option" msg */
855 return -EINVAL;
59c2be1e 856 }
210c104c 857
59c2be1e
YS
858 return 0;
859}
860
6d2940c8
GZ
861static char* obj_op_name(enum obj_operation_type op_type)
862{
863 switch (op_type) {
864 case OBJ_OP_READ:
865 return "read";
866 case OBJ_OP_WRITE:
867 return "write";
90e98c52
GZ
868 case OBJ_OP_DISCARD:
869 return "discard";
6d2940c8
GZ
870 default:
871 return "???";
872 }
873}
874
602adf40
YS
875/*
876 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
877 * not exist create it. Either way, ceph_opts is consumed by this
878 * function.
602adf40 879 */
9d3997fd 880static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 881{
f8c38929 882 struct rbd_client *rbdc;
59c2be1e 883
cfbf6377 884 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 885 rbdc = rbd_client_find(ceph_opts);
9d3997fd 886 if (rbdc) /* using an existing client */
43ae4701 887 ceph_destroy_options(ceph_opts);
9d3997fd 888 else
f8c38929 889 rbdc = rbd_client_create(ceph_opts);
cfbf6377 890 mutex_unlock(&client_mutex);
602adf40 891
9d3997fd 892 return rbdc;
602adf40
YS
893}
894
895/*
896 * Destroy ceph client
d23a4b3f 897 *
432b8587 898 * Caller must hold rbd_client_list_lock.
602adf40
YS
899 */
900static void rbd_client_release(struct kref *kref)
901{
902 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
903
37206ee5 904 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 905 spin_lock(&rbd_client_list_lock);
602adf40 906 list_del(&rbdc->node);
cd9d9f5d 907 spin_unlock(&rbd_client_list_lock);
602adf40
YS
908
909 ceph_destroy_client(rbdc->client);
910 kfree(rbdc);
911}
912
913/*
914 * Drop reference to ceph client node. If it's not referenced anymore, release
915 * it.
916 */
9d3997fd 917static void rbd_put_client(struct rbd_client *rbdc)
602adf40 918{
c53d5893
AE
919 if (rbdc)
920 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
921}
922
a30b71b9
AE
923static bool rbd_image_format_valid(u32 image_format)
924{
925 return image_format == 1 || image_format == 2;
926}
927
8e94af8e
AE
928static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
929{
103a150f
AE
930 size_t size;
931 u32 snap_count;
932
933 /* The header has to start with the magic rbd header text */
934 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
935 return false;
936
db2388b6
AE
937 /* The bio layer requires at least sector-sized I/O */
938
939 if (ondisk->options.order < SECTOR_SHIFT)
940 return false;
941
942 /* If we use u64 in a few spots we may be able to loosen this */
943
944 if (ondisk->options.order > 8 * sizeof (int) - 1)
945 return false;
946
103a150f
AE
947 /*
948 * The size of a snapshot header has to fit in a size_t, and
949 * that limits the number of snapshots.
950 */
951 snap_count = le32_to_cpu(ondisk->snap_count);
952 size = SIZE_MAX - sizeof (struct ceph_snap_context);
953 if (snap_count > size / sizeof (__le64))
954 return false;
955
956 /*
957 * Not only that, but the size of the entire the snapshot
958 * header must also be representable in a size_t.
959 */
960 size -= snap_count * sizeof (__le64);
961 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
962 return false;
963
964 return true;
8e94af8e
AE
965}
966
602adf40 967/*
bb23e37a
AE
968 * Fill an rbd image header with information from the given format 1
969 * on-disk header.
602adf40 970 */
662518b1 971static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 972 struct rbd_image_header_ondisk *ondisk)
602adf40 973{
662518b1 974 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
975 bool first_time = header->object_prefix == NULL;
976 struct ceph_snap_context *snapc;
977 char *object_prefix = NULL;
978 char *snap_names = NULL;
979 u64 *snap_sizes = NULL;
ccece235 980 u32 snap_count;
d2bb24e5 981 size_t size;
bb23e37a 982 int ret = -ENOMEM;
621901d6 983 u32 i;
602adf40 984
bb23e37a 985 /* Allocate this now to avoid having to handle failure below */
6a52325f 986
bb23e37a
AE
987 if (first_time) {
988 size_t len;
103a150f 989
bb23e37a
AE
990 len = strnlen(ondisk->object_prefix,
991 sizeof (ondisk->object_prefix));
992 object_prefix = kmalloc(len + 1, GFP_KERNEL);
993 if (!object_prefix)
994 return -ENOMEM;
995 memcpy(object_prefix, ondisk->object_prefix, len);
996 object_prefix[len] = '\0';
997 }
00f1f36f 998
bb23e37a 999 /* Allocate the snapshot context and fill it in */
00f1f36f 1000
bb23e37a
AE
1001 snap_count = le32_to_cpu(ondisk->snap_count);
1002 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1003 if (!snapc)
1004 goto out_err;
1005 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1006 if (snap_count) {
bb23e37a 1007 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1008 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1009
bb23e37a 1010 /* We'll keep a copy of the snapshot names... */
621901d6 1011
bb23e37a
AE
1012 if (snap_names_len > (u64)SIZE_MAX)
1013 goto out_2big;
1014 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1015 if (!snap_names)
6a52325f
AE
1016 goto out_err;
1017
bb23e37a 1018 /* ...as well as the array of their sizes. */
621901d6 1019
d2bb24e5 1020 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
1021 snap_sizes = kmalloc(size, GFP_KERNEL);
1022 if (!snap_sizes)
6a52325f 1023 goto out_err;
bb23e37a 1024
f785cc1d 1025 /*
bb23e37a
AE
1026 * Copy the names, and fill in each snapshot's id
1027 * and size.
1028 *
99a41ebc 1029 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1030 * ondisk buffer we're working with has
f785cc1d
AE
1031 * snap_names_len bytes beyond the end of the
1032 * snapshot id array, this memcpy() is safe.
1033 */
bb23e37a
AE
1034 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1035 snaps = ondisk->snaps;
1036 for (i = 0; i < snap_count; i++) {
1037 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1038 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1039 }
602adf40 1040 }
6a52325f 1041
bb23e37a 1042 /* We won't fail any more, fill in the header */
621901d6 1043
bb23e37a
AE
1044 if (first_time) {
1045 header->object_prefix = object_prefix;
1046 header->obj_order = ondisk->options.order;
1047 header->crypt_type = ondisk->options.crypt_type;
1048 header->comp_type = ondisk->options.comp_type;
1049 /* The rest aren't used for format 1 images */
1050 header->stripe_unit = 0;
1051 header->stripe_count = 0;
1052 header->features = 0;
602adf40 1053 } else {
662518b1
AE
1054 ceph_put_snap_context(header->snapc);
1055 kfree(header->snap_names);
1056 kfree(header->snap_sizes);
602adf40 1057 }
849b4260 1058
bb23e37a 1059 /* The remaining fields always get updated (when we refresh) */
621901d6 1060
f84344f3 1061 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1062 header->snapc = snapc;
1063 header->snap_names = snap_names;
1064 header->snap_sizes = snap_sizes;
468521c1 1065
602adf40 1066 return 0;
bb23e37a
AE
1067out_2big:
1068 ret = -EIO;
6a52325f 1069out_err:
bb23e37a
AE
1070 kfree(snap_sizes);
1071 kfree(snap_names);
1072 ceph_put_snap_context(snapc);
1073 kfree(object_prefix);
ccece235 1074
bb23e37a 1075 return ret;
602adf40
YS
1076}
1077
9682fc6d
AE
1078static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1079{
1080 const char *snap_name;
1081
1082 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1083
1084 /* Skip over names until we find the one we are looking for */
1085
1086 snap_name = rbd_dev->header.snap_names;
1087 while (which--)
1088 snap_name += strlen(snap_name) + 1;
1089
1090 return kstrdup(snap_name, GFP_KERNEL);
1091}
1092
30d1cff8
AE
1093/*
1094 * Snapshot id comparison function for use with qsort()/bsearch().
1095 * Note that result is for snapshots in *descending* order.
1096 */
1097static int snapid_compare_reverse(const void *s1, const void *s2)
1098{
1099 u64 snap_id1 = *(u64 *)s1;
1100 u64 snap_id2 = *(u64 *)s2;
1101
1102 if (snap_id1 < snap_id2)
1103 return 1;
1104 return snap_id1 == snap_id2 ? 0 : -1;
1105}
1106
1107/*
1108 * Search a snapshot context to see if the given snapshot id is
1109 * present.
1110 *
1111 * Returns the position of the snapshot id in the array if it's found,
1112 * or BAD_SNAP_INDEX otherwise.
1113 *
1114 * Note: The snapshot array is in kept sorted (by the osd) in
1115 * reverse order, highest snapshot id first.
1116 */
9682fc6d
AE
1117static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1118{
1119 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1120 u64 *found;
9682fc6d 1121
30d1cff8
AE
1122 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1123 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1124
30d1cff8 1125 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1126}
1127
2ad3d716
AE
1128static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1129 u64 snap_id)
9e15b77d 1130{
54cac61f 1131 u32 which;
da6a6b63 1132 const char *snap_name;
9e15b77d 1133
54cac61f
AE
1134 which = rbd_dev_snap_index(rbd_dev, snap_id);
1135 if (which == BAD_SNAP_INDEX)
da6a6b63 1136 return ERR_PTR(-ENOENT);
54cac61f 1137
da6a6b63
JD
1138 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1139 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1140}
1141
1142static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1143{
9e15b77d
AE
1144 if (snap_id == CEPH_NOSNAP)
1145 return RBD_SNAP_HEAD_NAME;
1146
54cac61f
AE
1147 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1148 if (rbd_dev->image_format == 1)
1149 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1150
54cac61f 1151 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1152}
1153
2ad3d716
AE
1154static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1155 u64 *snap_size)
602adf40 1156{
2ad3d716
AE
1157 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1158 if (snap_id == CEPH_NOSNAP) {
1159 *snap_size = rbd_dev->header.image_size;
1160 } else if (rbd_dev->image_format == 1) {
1161 u32 which;
602adf40 1162
2ad3d716
AE
1163 which = rbd_dev_snap_index(rbd_dev, snap_id);
1164 if (which == BAD_SNAP_INDEX)
1165 return -ENOENT;
e86924a8 1166
2ad3d716
AE
1167 *snap_size = rbd_dev->header.snap_sizes[which];
1168 } else {
1169 u64 size = 0;
1170 int ret;
1171
1172 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1173 if (ret)
1174 return ret;
1175
1176 *snap_size = size;
1177 }
1178 return 0;
602adf40
YS
1179}
1180
2ad3d716
AE
1181static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1182 u64 *snap_features)
602adf40 1183{
2ad3d716
AE
1184 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1185 if (snap_id == CEPH_NOSNAP) {
1186 *snap_features = rbd_dev->header.features;
1187 } else if (rbd_dev->image_format == 1) {
1188 *snap_features = 0; /* No features for format 1 */
602adf40 1189 } else {
2ad3d716
AE
1190 u64 features = 0;
1191 int ret;
8b0241f8 1192
2ad3d716
AE
1193 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1194 if (ret)
1195 return ret;
1196
1197 *snap_features = features;
1198 }
1199 return 0;
1200}
1201
1202static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1203{
8f4b7d98 1204 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1205 u64 size = 0;
1206 u64 features = 0;
1207 int ret;
1208
2ad3d716
AE
1209 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1210 if (ret)
1211 return ret;
1212 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1213 if (ret)
1214 return ret;
1215
1216 rbd_dev->mapping.size = size;
1217 rbd_dev->mapping.features = features;
1218
8b0241f8 1219 return 0;
602adf40
YS
1220}
1221
d1cf5788
AE
1222static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1223{
1224 rbd_dev->mapping.size = 0;
1225 rbd_dev->mapping.features = 0;
200a6a8b
AE
1226}
1227
7d5079aa
HS
1228static void rbd_segment_name_free(const char *name)
1229{
1230 /* The explicit cast here is needed to drop the const qualifier */
1231
1232 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1233}
1234
98571b5a 1235static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1236{
65ccfe21
AE
1237 char *name;
1238 u64 segment;
1239 int ret;
3a96d5cd 1240 char *name_format;
602adf40 1241
78c2a44a 1242 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1243 if (!name)
1244 return NULL;
1245 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1246 name_format = "%s.%012llx";
1247 if (rbd_dev->image_format == 2)
1248 name_format = "%s.%016llx";
2d0ebc5d 1249 ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
65ccfe21 1250 rbd_dev->header.object_prefix, segment);
2d0ebc5d 1251 if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
65ccfe21
AE
1252 pr_err("error formatting segment name for #%llu (%d)\n",
1253 segment, ret);
7d5079aa 1254 rbd_segment_name_free(name);
65ccfe21
AE
1255 name = NULL;
1256 }
602adf40 1257
65ccfe21
AE
1258 return name;
1259}
602adf40 1260
65ccfe21
AE
1261static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1262{
1263 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1264
65ccfe21
AE
1265 return offset & (segment_size - 1);
1266}
1267
1268static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269 u64 offset, u64 length)
1270{
1271 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1272
1273 offset &= segment_size - 1;
1274
aafb230e 1275 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1276 if (offset + length > segment_size)
1277 length = segment_size - offset;
1278
1279 return length;
602adf40
YS
1280}
1281
029bcbd8
JD
1282/*
1283 * returns the size of an object in the image
1284 */
1285static u64 rbd_obj_bytes(struct rbd_image_header *header)
1286{
1287 return 1 << header->obj_order;
1288}
1289
602adf40
YS
1290/*
1291 * bio helpers
1292 */
1293
1294static void bio_chain_put(struct bio *chain)
1295{
1296 struct bio *tmp;
1297
1298 while (chain) {
1299 tmp = chain;
1300 chain = chain->bi_next;
1301 bio_put(tmp);
1302 }
1303}
1304
1305/*
1306 * zeros a bio chain, starting at specific offset
1307 */
1308static void zero_bio_chain(struct bio *chain, int start_ofs)
1309{
7988613b
KO
1310 struct bio_vec bv;
1311 struct bvec_iter iter;
602adf40
YS
1312 unsigned long flags;
1313 void *buf;
602adf40
YS
1314 int pos = 0;
1315
1316 while (chain) {
7988613b
KO
1317 bio_for_each_segment(bv, chain, iter) {
1318 if (pos + bv.bv_len > start_ofs) {
602adf40 1319 int remainder = max(start_ofs - pos, 0);
7988613b 1320 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1321 memset(buf + remainder, 0,
7988613b
KO
1322 bv.bv_len - remainder);
1323 flush_dcache_page(bv.bv_page);
85b5aaa6 1324 bvec_kunmap_irq(buf, &flags);
602adf40 1325 }
7988613b 1326 pos += bv.bv_len;
602adf40
YS
1327 }
1328
1329 chain = chain->bi_next;
1330 }
1331}
1332
b9434c5b
AE
1333/*
1334 * similar to zero_bio_chain(), zeros data defined by a page array,
1335 * starting at the given byte offset from the start of the array and
1336 * continuing up to the given end offset. The pages array is
1337 * assumed to be big enough to hold all bytes up to the end.
1338 */
1339static void zero_pages(struct page **pages, u64 offset, u64 end)
1340{
1341 struct page **page = &pages[offset >> PAGE_SHIFT];
1342
1343 rbd_assert(end > offset);
1344 rbd_assert(end - offset <= (u64)SIZE_MAX);
1345 while (offset < end) {
1346 size_t page_offset;
1347 size_t length;
1348 unsigned long flags;
1349 void *kaddr;
1350
491205a8
GU
1351 page_offset = offset & ~PAGE_MASK;
1352 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1353 local_irq_save(flags);
1354 kaddr = kmap_atomic(*page);
1355 memset(kaddr + page_offset, 0, length);
e2156054 1356 flush_dcache_page(*page);
b9434c5b
AE
1357 kunmap_atomic(kaddr);
1358 local_irq_restore(flags);
1359
1360 offset += length;
1361 page++;
1362 }
1363}
1364
602adf40 1365/*
f7760dad
AE
1366 * Clone a portion of a bio, starting at the given byte offset
1367 * and continuing for the number of bytes indicated.
602adf40 1368 */
f7760dad
AE
1369static struct bio *bio_clone_range(struct bio *bio_src,
1370 unsigned int offset,
1371 unsigned int len,
1372 gfp_t gfpmask)
602adf40 1373{
f7760dad
AE
1374 struct bio *bio;
1375
5341a627 1376 bio = bio_clone(bio_src, gfpmask);
f7760dad
AE
1377 if (!bio)
1378 return NULL; /* ENOMEM */
602adf40 1379
5341a627 1380 bio_advance(bio, offset);
4f024f37 1381 bio->bi_iter.bi_size = len;
f7760dad
AE
1382
1383 return bio;
1384}
1385
1386/*
1387 * Clone a portion of a bio chain, starting at the given byte offset
1388 * into the first bio in the source chain and continuing for the
1389 * number of bytes indicated. The result is another bio chain of
1390 * exactly the given length, or a null pointer on error.
1391 *
1392 * The bio_src and offset parameters are both in-out. On entry they
1393 * refer to the first source bio and the offset into that bio where
1394 * the start of data to be cloned is located.
1395 *
1396 * On return, bio_src is updated to refer to the bio in the source
1397 * chain that contains first un-cloned byte, and *offset will
1398 * contain the offset of that byte within that bio.
1399 */
1400static struct bio *bio_chain_clone_range(struct bio **bio_src,
1401 unsigned int *offset,
1402 unsigned int len,
1403 gfp_t gfpmask)
1404{
1405 struct bio *bi = *bio_src;
1406 unsigned int off = *offset;
1407 struct bio *chain = NULL;
1408 struct bio **end;
1409
1410 /* Build up a chain of clone bios up to the limit */
1411
4f024f37 1412 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1413 return NULL; /* Nothing to clone */
602adf40 1414
f7760dad
AE
1415 end = &chain;
1416 while (len) {
1417 unsigned int bi_size;
1418 struct bio *bio;
1419
f5400b7a
AE
1420 if (!bi) {
1421 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1422 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1423 }
4f024f37 1424 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1425 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1426 if (!bio)
1427 goto out_err; /* ENOMEM */
1428
1429 *end = bio;
1430 end = &bio->bi_next;
602adf40 1431
f7760dad 1432 off += bi_size;
4f024f37 1433 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1434 bi = bi->bi_next;
1435 off = 0;
1436 }
1437 len -= bi_size;
1438 }
1439 *bio_src = bi;
1440 *offset = off;
1441
1442 return chain;
1443out_err:
1444 bio_chain_put(chain);
602adf40 1445
602adf40
YS
1446 return NULL;
1447}
1448
926f9b3f
AE
1449/*
1450 * The default/initial value for all object request flags is 0. For
1451 * each flag, once its value is set to 1 it is never reset to 0
1452 * again.
1453 */
57acbaa7 1454static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1455{
57acbaa7 1456 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1457 struct rbd_device *rbd_dev;
1458
57acbaa7 1459 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1460 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1461 obj_request);
1462 }
1463}
1464
57acbaa7 1465static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1466{
1467 smp_mb();
57acbaa7 1468 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1469}
1470
57acbaa7 1471static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1472{
57acbaa7
AE
1473 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1474 struct rbd_device *rbd_dev = NULL;
6365d33a 1475
57acbaa7
AE
1476 if (obj_request_img_data_test(obj_request))
1477 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1478 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1479 obj_request);
1480 }
1481}
1482
57acbaa7 1483static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1484{
1485 smp_mb();
57acbaa7 1486 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1487}
1488
5679c59f
AE
1489/*
1490 * This sets the KNOWN flag after (possibly) setting the EXISTS
1491 * flag. The latter is set based on the "exists" value provided.
1492 *
1493 * Note that for our purposes once an object exists it never goes
1494 * away again. It's possible that the response from two existence
1495 * checks are separated by the creation of the target object, and
1496 * the first ("doesn't exist") response arrives *after* the second
1497 * ("does exist"). In that case we ignore the second one.
1498 */
1499static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1500 bool exists)
1501{
1502 if (exists)
1503 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1504 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1505 smp_mb();
1506}
1507
1508static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1509{
1510 smp_mb();
1511 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1512}
1513
1514static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1515{
1516 smp_mb();
1517 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1518}
1519
9638556a
ID
1520static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1521{
1522 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1523
1524 return obj_request->img_offset <
1525 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1526}
1527
bf0d5f50
AE
1528static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1529{
37206ee5
AE
1530 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1531 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1532 kref_get(&obj_request->kref);
1533}
1534
1535static void rbd_obj_request_destroy(struct kref *kref);
1536static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1537{
1538 rbd_assert(obj_request != NULL);
37206ee5
AE
1539 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1540 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1541 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1542}
1543
0f2d5be7
AE
1544static void rbd_img_request_get(struct rbd_img_request *img_request)
1545{
1546 dout("%s: img %p (was %d)\n", __func__, img_request,
1547 atomic_read(&img_request->kref.refcount));
1548 kref_get(&img_request->kref);
1549}
1550
e93f3152
AE
1551static bool img_request_child_test(struct rbd_img_request *img_request);
1552static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1553static void rbd_img_request_destroy(struct kref *kref);
1554static void rbd_img_request_put(struct rbd_img_request *img_request)
1555{
1556 rbd_assert(img_request != NULL);
37206ee5
AE
1557 dout("%s: img %p (was %d)\n", __func__, img_request,
1558 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1559 if (img_request_child_test(img_request))
1560 kref_put(&img_request->kref, rbd_parent_request_destroy);
1561 else
1562 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1563}
1564
1565static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1566 struct rbd_obj_request *obj_request)
1567{
25dcf954
AE
1568 rbd_assert(obj_request->img_request == NULL);
1569
b155e86c 1570 /* Image request now owns object's original reference */
bf0d5f50 1571 obj_request->img_request = img_request;
25dcf954 1572 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1573 rbd_assert(!obj_request_img_data_test(obj_request));
1574 obj_request_img_data_set(obj_request);
bf0d5f50 1575 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1576 img_request->obj_request_count++;
1577 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1578 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1579 obj_request->which);
bf0d5f50
AE
1580}
1581
1582static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1583 struct rbd_obj_request *obj_request)
1584{
1585 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1586
37206ee5
AE
1587 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1588 obj_request->which);
bf0d5f50 1589 list_del(&obj_request->links);
25dcf954
AE
1590 rbd_assert(img_request->obj_request_count > 0);
1591 img_request->obj_request_count--;
1592 rbd_assert(obj_request->which == img_request->obj_request_count);
1593 obj_request->which = BAD_WHICH;
6365d33a 1594 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1595 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1596 obj_request->img_request = NULL;
25dcf954 1597 obj_request->callback = NULL;
bf0d5f50
AE
1598 rbd_obj_request_put(obj_request);
1599}
1600
1601static bool obj_request_type_valid(enum obj_request_type type)
1602{
1603 switch (type) {
9969ebc5 1604 case OBJ_REQUEST_NODATA:
bf0d5f50 1605 case OBJ_REQUEST_BIO:
788e2df3 1606 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1607 return true;
1608 default:
1609 return false;
1610 }
1611}
1612
bf0d5f50
AE
1613static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1614 struct rbd_obj_request *obj_request)
1615{
71c20a06 1616 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
1617 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1618}
1619
71c20a06
ID
1620static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1621{
1622 dout("%s %p\n", __func__, obj_request);
1623 ceph_osdc_cancel_request(obj_request->osd_req);
1624}
1625
1626/*
1627 * Wait for an object request to complete. If interrupted, cancel the
1628 * underlying osd request.
2894e1d7
ID
1629 *
1630 * @timeout: in jiffies, 0 means "wait forever"
71c20a06 1631 */
2894e1d7
ID
1632static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
1633 unsigned long timeout)
71c20a06 1634{
2894e1d7 1635 long ret;
71c20a06
ID
1636
1637 dout("%s %p\n", __func__, obj_request);
2894e1d7
ID
1638 ret = wait_for_completion_interruptible_timeout(
1639 &obj_request->completion,
1640 ceph_timeout_jiffies(timeout));
1641 if (ret <= 0) {
1642 if (ret == 0)
1643 ret = -ETIMEDOUT;
71c20a06 1644 rbd_obj_request_end(obj_request);
2894e1d7
ID
1645 } else {
1646 ret = 0;
71c20a06
ID
1647 }
1648
2894e1d7
ID
1649 dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
1650 return ret;
1651}
1652
1653static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1654{
1655 return __rbd_obj_request_wait(obj_request, 0);
1656}
1657
bf0d5f50
AE
1658static void rbd_img_request_complete(struct rbd_img_request *img_request)
1659{
55f27e09 1660
37206ee5 1661 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1662
1663 /*
1664 * If no error occurred, compute the aggregate transfer
1665 * count for the image request. We could instead use
1666 * atomic64_cmpxchg() to update it as each object request
1667 * completes; not clear which way is better off hand.
1668 */
1669 if (!img_request->result) {
1670 struct rbd_obj_request *obj_request;
1671 u64 xferred = 0;
1672
1673 for_each_obj_request(img_request, obj_request)
1674 xferred += obj_request->xferred;
1675 img_request->xferred = xferred;
1676 }
1677
bf0d5f50
AE
1678 if (img_request->callback)
1679 img_request->callback(img_request);
1680 else
1681 rbd_img_request_put(img_request);
1682}
1683
0c425248
AE
1684/*
1685 * The default/initial value for all image request flags is 0. Each
1686 * is conditionally set to 1 at image request initialization time
1687 * and currently never change thereafter.
1688 */
1689static void img_request_write_set(struct rbd_img_request *img_request)
1690{
1691 set_bit(IMG_REQ_WRITE, &img_request->flags);
1692 smp_mb();
1693}
1694
1695static bool img_request_write_test(struct rbd_img_request *img_request)
1696{
1697 smp_mb();
1698 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1699}
1700
90e98c52
GZ
1701/*
1702 * Set the discard flag when the img_request is an discard request
1703 */
1704static void img_request_discard_set(struct rbd_img_request *img_request)
1705{
1706 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1707 smp_mb();
1708}
1709
1710static bool img_request_discard_test(struct rbd_img_request *img_request)
1711{
1712 smp_mb();
1713 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1714}
1715
9849e986
AE
1716static void img_request_child_set(struct rbd_img_request *img_request)
1717{
1718 set_bit(IMG_REQ_CHILD, &img_request->flags);
1719 smp_mb();
1720}
1721
e93f3152
AE
1722static void img_request_child_clear(struct rbd_img_request *img_request)
1723{
1724 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1725 smp_mb();
1726}
1727
9849e986
AE
1728static bool img_request_child_test(struct rbd_img_request *img_request)
1729{
1730 smp_mb();
1731 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1732}
1733
d0b2e944
AE
1734static void img_request_layered_set(struct rbd_img_request *img_request)
1735{
1736 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1737 smp_mb();
1738}
1739
a2acd00e
AE
1740static void img_request_layered_clear(struct rbd_img_request *img_request)
1741{
1742 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1743 smp_mb();
1744}
1745
d0b2e944
AE
1746static bool img_request_layered_test(struct rbd_img_request *img_request)
1747{
1748 smp_mb();
1749 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1750}
1751
3b434a2a
JD
1752static enum obj_operation_type
1753rbd_img_request_op_type(struct rbd_img_request *img_request)
1754{
1755 if (img_request_write_test(img_request))
1756 return OBJ_OP_WRITE;
1757 else if (img_request_discard_test(img_request))
1758 return OBJ_OP_DISCARD;
1759 else
1760 return OBJ_OP_READ;
1761}
1762
6e2a4505
AE
1763static void
1764rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1765{
b9434c5b
AE
1766 u64 xferred = obj_request->xferred;
1767 u64 length = obj_request->length;
1768
6e2a4505
AE
1769 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1770 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1771 xferred, length);
6e2a4505 1772 /*
17c1cc1d
JD
1773 * ENOENT means a hole in the image. We zero-fill the entire
1774 * length of the request. A short read also implies zero-fill
1775 * to the end of the request. An error requires the whole
1776 * length of the request to be reported finished with an error
1777 * to the block layer. In each case we update the xferred
1778 * count to indicate the whole request was satisfied.
6e2a4505 1779 */
b9434c5b 1780 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1781 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1782 if (obj_request->type == OBJ_REQUEST_BIO)
1783 zero_bio_chain(obj_request->bio_list, 0);
1784 else
1785 zero_pages(obj_request->pages, 0, length);
6e2a4505 1786 obj_request->result = 0;
b9434c5b
AE
1787 } else if (xferred < length && !obj_request->result) {
1788 if (obj_request->type == OBJ_REQUEST_BIO)
1789 zero_bio_chain(obj_request->bio_list, xferred);
1790 else
1791 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1792 }
17c1cc1d 1793 obj_request->xferred = length;
6e2a4505
AE
1794 obj_request_done_set(obj_request);
1795}
1796
bf0d5f50
AE
1797static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1798{
37206ee5
AE
1799 dout("%s: obj %p cb %p\n", __func__, obj_request,
1800 obj_request->callback);
bf0d5f50
AE
1801 if (obj_request->callback)
1802 obj_request->callback(obj_request);
788e2df3
AE
1803 else
1804 complete_all(&obj_request->completion);
bf0d5f50
AE
1805}
1806
c47f9371 1807static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1808{
57acbaa7 1809 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1810 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1811 bool layered = false;
1812
1813 if (obj_request_img_data_test(obj_request)) {
1814 img_request = obj_request->img_request;
1815 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1816 rbd_dev = img_request->rbd_dev;
57acbaa7 1817 }
8b3e1a56
AE
1818
1819 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1820 obj_request, img_request, obj_request->result,
1821 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1822 if (layered && obj_request->result == -ENOENT &&
1823 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1824 rbd_img_parent_read(obj_request);
1825 else if (img_request)
6e2a4505
AE
1826 rbd_img_obj_request_read_callback(obj_request);
1827 else
1828 obj_request_done_set(obj_request);
bf0d5f50
AE
1829}
1830
c47f9371 1831static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1832{
1b83bef2
SW
1833 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1834 obj_request->result, obj_request->length);
1835 /*
8b3e1a56
AE
1836 * There is no such thing as a successful short write. Set
1837 * it to our originally-requested length.
1b83bef2
SW
1838 */
1839 obj_request->xferred = obj_request->length;
07741308 1840 obj_request_done_set(obj_request);
bf0d5f50
AE
1841}
1842
90e98c52
GZ
1843static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1844{
1845 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1846 obj_request->result, obj_request->length);
1847 /*
1848 * There is no such thing as a successful short discard. Set
1849 * it to our originally-requested length.
1850 */
1851 obj_request->xferred = obj_request->length;
d0265de7
JD
1852 /* discarding a non-existent object is not a problem */
1853 if (obj_request->result == -ENOENT)
1854 obj_request->result = 0;
90e98c52
GZ
1855 obj_request_done_set(obj_request);
1856}
1857
fbfab539
AE
1858/*
1859 * For a simple stat call there's nothing to do. We'll do more if
1860 * this is part of a write sequence for a layered image.
1861 */
c47f9371 1862static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1863{
37206ee5 1864 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1865 obj_request_done_set(obj_request);
1866}
1867
2761713d
ID
1868static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1869{
1870 dout("%s: obj %p\n", __func__, obj_request);
1871
1872 if (obj_request_img_data_test(obj_request))
1873 rbd_osd_copyup_callback(obj_request);
1874 else
1875 obj_request_done_set(obj_request);
1876}
1877
85e084fe 1878static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1879{
1880 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1881 u16 opcode;
1882
85e084fe 1883 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1884 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1885 if (obj_request_img_data_test(obj_request)) {
1886 rbd_assert(obj_request->img_request);
1887 rbd_assert(obj_request->which != BAD_WHICH);
1888 } else {
1889 rbd_assert(obj_request->which == BAD_WHICH);
1890 }
bf0d5f50 1891
1b83bef2
SW
1892 if (osd_req->r_result < 0)
1893 obj_request->result = osd_req->r_result;
bf0d5f50 1894
c47f9371
AE
1895 /*
1896 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1897 * passed to the block layer, which just supports a 32-bit
1898 * length field.
c47f9371 1899 */
7665d85b 1900 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1901 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1902
79528734 1903 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1904 switch (opcode) {
1905 case CEPH_OSD_OP_READ:
c47f9371 1906 rbd_osd_read_callback(obj_request);
bf0d5f50 1907 break;
0ccd5926 1908 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1909 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1910 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1911 /* fall through */
bf0d5f50 1912 case CEPH_OSD_OP_WRITE:
e30b7577 1913 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1914 rbd_osd_write_callback(obj_request);
bf0d5f50 1915 break;
fbfab539 1916 case CEPH_OSD_OP_STAT:
c47f9371 1917 rbd_osd_stat_callback(obj_request);
fbfab539 1918 break;
90e98c52
GZ
1919 case CEPH_OSD_OP_DELETE:
1920 case CEPH_OSD_OP_TRUNCATE:
1921 case CEPH_OSD_OP_ZERO:
1922 rbd_osd_discard_callback(obj_request);
1923 break;
36be9a76 1924 case CEPH_OSD_OP_CALL:
2761713d
ID
1925 rbd_osd_call_callback(obj_request);
1926 break;
bf0d5f50 1927 default:
9584d508 1928 rbd_warn(NULL, "%s: unsupported op %hu",
bf0d5f50
AE
1929 obj_request->object_name, (unsigned short) opcode);
1930 break;
1931 }
1932
07741308 1933 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1934 rbd_obj_request_complete(obj_request);
1935}
1936
9d4df01f 1937static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1938{
1939 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1940 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1941
bb873b53
ID
1942 if (img_request)
1943 osd_req->r_snapid = img_request->snap_id;
9d4df01f
AE
1944}
1945
1946static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1947{
9d4df01f 1948 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1949
bb873b53
ID
1950 osd_req->r_mtime = CURRENT_TIME;
1951 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1952}
1953
0ccd5926
ID
1954/*
1955 * Create an osd request. A read request has one osd op (read).
1956 * A write request has either one (watch) or two (hint+write) osd ops.
1957 * (All rbd data writes are prefixed with an allocation hint op, but
1958 * technically osd watch is a write request, hence this distinction.)
1959 */
bf0d5f50
AE
1960static struct ceph_osd_request *rbd_osd_req_create(
1961 struct rbd_device *rbd_dev,
6d2940c8 1962 enum obj_operation_type op_type,
deb236b3 1963 unsigned int num_ops,
430c28c3 1964 struct rbd_obj_request *obj_request)
bf0d5f50 1965{
bf0d5f50
AE
1966 struct ceph_snap_context *snapc = NULL;
1967 struct ceph_osd_client *osdc;
1968 struct ceph_osd_request *osd_req;
bf0d5f50 1969
90e98c52
GZ
1970 if (obj_request_img_data_test(obj_request) &&
1971 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1972 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1973 if (op_type == OBJ_OP_WRITE) {
1974 rbd_assert(img_request_write_test(img_request));
1975 } else {
1976 rbd_assert(img_request_discard_test(img_request));
1977 }
6d2940c8 1978 snapc = img_request->snapc;
bf0d5f50
AE
1979 }
1980
6d2940c8 1981 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3
ID
1982
1983 /* Allocate and initialize the request, for the num_ops ops */
bf0d5f50
AE
1984
1985 osdc = &rbd_dev->rbd_client->client->osdc;
deb236b3 1986 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
2224d879 1987 GFP_NOIO);
bf0d5f50 1988 if (!osd_req)
13d1ad16 1989 goto fail;
bf0d5f50 1990
90e98c52 1991 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
bf0d5f50 1992 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1993 else
bf0d5f50 1994 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1995
1996 osd_req->r_callback = rbd_osd_req_callback;
1997 osd_req->r_priv = obj_request;
1998
7627151e 1999 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2000 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2001 obj_request->object_name))
2002 goto fail;
bf0d5f50 2003
13d1ad16
ID
2004 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2005 goto fail;
2006
bf0d5f50 2007 return osd_req;
13d1ad16
ID
2008
2009fail:
2010 ceph_osdc_put_request(osd_req);
2011 return NULL;
bf0d5f50
AE
2012}
2013
0eefd470 2014/*
d3246fb0
JD
2015 * Create a copyup osd request based on the information in the object
2016 * request supplied. A copyup request has two or three osd ops, a
2017 * copyup method call, potentially a hint op, and a write or truncate
2018 * or zero op.
0eefd470
AE
2019 */
2020static struct ceph_osd_request *
2021rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2022{
2023 struct rbd_img_request *img_request;
2024 struct ceph_snap_context *snapc;
2025 struct rbd_device *rbd_dev;
2026 struct ceph_osd_client *osdc;
2027 struct ceph_osd_request *osd_req;
d3246fb0 2028 int num_osd_ops = 3;
0eefd470
AE
2029
2030 rbd_assert(obj_request_img_data_test(obj_request));
2031 img_request = obj_request->img_request;
2032 rbd_assert(img_request);
d3246fb0
JD
2033 rbd_assert(img_request_write_test(img_request) ||
2034 img_request_discard_test(img_request));
0eefd470 2035
d3246fb0
JD
2036 if (img_request_discard_test(img_request))
2037 num_osd_ops = 2;
2038
2039 /* Allocate and initialize the request, for all the ops */
0eefd470
AE
2040
2041 snapc = img_request->snapc;
2042 rbd_dev = img_request->rbd_dev;
2043 osdc = &rbd_dev->rbd_client->client->osdc;
d3246fb0 2044 osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
2224d879 2045 false, GFP_NOIO);
0eefd470 2046 if (!osd_req)
13d1ad16 2047 goto fail;
0eefd470
AE
2048
2049 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
2050 osd_req->r_callback = rbd_osd_req_callback;
2051 osd_req->r_priv = obj_request;
2052
7627151e 2053 osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
d30291b9
ID
2054 if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
2055 obj_request->object_name))
2056 goto fail;
0eefd470 2057
13d1ad16
ID
2058 if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
2059 goto fail;
2060
0eefd470 2061 return osd_req;
13d1ad16
ID
2062
2063fail:
2064 ceph_osdc_put_request(osd_req);
2065 return NULL;
0eefd470
AE
2066}
2067
2068
bf0d5f50
AE
2069static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2070{
2071 ceph_osdc_put_request(osd_req);
2072}
2073
2074/* object_name is assumed to be a non-null pointer and NUL-terminated */
2075
2076static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
2077 u64 offset, u64 length,
2078 enum obj_request_type type)
2079{
2080 struct rbd_obj_request *obj_request;
2081 size_t size;
2082 char *name;
2083
2084 rbd_assert(obj_request_type_valid(type));
2085
2086 size = strlen(object_name) + 1;
5a60e876 2087 name = kmalloc(size, GFP_NOIO);
f907ad55 2088 if (!name)
bf0d5f50
AE
2089 return NULL;
2090
5a60e876 2091 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
2092 if (!obj_request) {
2093 kfree(name);
2094 return NULL;
2095 }
2096
bf0d5f50
AE
2097 obj_request->object_name = memcpy(name, object_name, size);
2098 obj_request->offset = offset;
2099 obj_request->length = length;
926f9b3f 2100 obj_request->flags = 0;
bf0d5f50
AE
2101 obj_request->which = BAD_WHICH;
2102 obj_request->type = type;
2103 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2104 init_completion(&obj_request->completion);
bf0d5f50
AE
2105 kref_init(&obj_request->kref);
2106
37206ee5
AE
2107 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2108 offset, length, (int)type, obj_request);
2109
bf0d5f50
AE
2110 return obj_request;
2111}
2112
2113static void rbd_obj_request_destroy(struct kref *kref)
2114{
2115 struct rbd_obj_request *obj_request;
2116
2117 obj_request = container_of(kref, struct rbd_obj_request, kref);
2118
37206ee5
AE
2119 dout("%s: obj %p\n", __func__, obj_request);
2120
bf0d5f50
AE
2121 rbd_assert(obj_request->img_request == NULL);
2122 rbd_assert(obj_request->which == BAD_WHICH);
2123
2124 if (obj_request->osd_req)
2125 rbd_osd_req_destroy(obj_request->osd_req);
2126
2127 rbd_assert(obj_request_type_valid(obj_request->type));
2128 switch (obj_request->type) {
9969ebc5
AE
2129 case OBJ_REQUEST_NODATA:
2130 break; /* Nothing to do */
bf0d5f50
AE
2131 case OBJ_REQUEST_BIO:
2132 if (obj_request->bio_list)
2133 bio_chain_put(obj_request->bio_list);
2134 break;
788e2df3
AE
2135 case OBJ_REQUEST_PAGES:
2136 if (obj_request->pages)
2137 ceph_release_page_vector(obj_request->pages,
2138 obj_request->page_count);
2139 break;
bf0d5f50
AE
2140 }
2141
f907ad55 2142 kfree(obj_request->object_name);
868311b1
AE
2143 obj_request->object_name = NULL;
2144 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2145}
2146
fb65d228
AE
2147/* It's OK to call this for a device with no parent */
2148
2149static void rbd_spec_put(struct rbd_spec *spec);
2150static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2151{
2152 rbd_dev_remove_parent(rbd_dev);
2153 rbd_spec_put(rbd_dev->parent_spec);
2154 rbd_dev->parent_spec = NULL;
2155 rbd_dev->parent_overlap = 0;
2156}
2157
a2acd00e
AE
2158/*
2159 * Parent image reference counting is used to determine when an
2160 * image's parent fields can be safely torn down--after there are no
2161 * more in-flight requests to the parent image. When the last
2162 * reference is dropped, cleaning them up is safe.
2163 */
2164static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2165{
2166 int counter;
2167
2168 if (!rbd_dev->parent_spec)
2169 return;
2170
2171 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2172 if (counter > 0)
2173 return;
2174
2175 /* Last reference; clean up parent data structures */
2176
2177 if (!counter)
2178 rbd_dev_unparent(rbd_dev);
2179 else
9584d508 2180 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2181}
2182
2183/*
2184 * If an image has a non-zero parent overlap, get a reference to its
2185 * parent.
2186 *
2187 * Returns true if the rbd device has a parent with a non-zero
2188 * overlap and a reference for it was successfully taken, or
2189 * false otherwise.
2190 */
2191static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2192{
ae43e9d0 2193 int counter = 0;
a2acd00e
AE
2194
2195 if (!rbd_dev->parent_spec)
2196 return false;
2197
ae43e9d0
ID
2198 down_read(&rbd_dev->header_rwsem);
2199 if (rbd_dev->parent_overlap)
2200 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2201 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2202
2203 if (counter < 0)
9584d508 2204 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2205
ae43e9d0 2206 return counter > 0;
a2acd00e
AE
2207}
2208
bf0d5f50
AE
2209/*
2210 * Caller is responsible for filling in the list of object requests
2211 * that comprises the image request, and the Linux request pointer
2212 * (if there is one).
2213 */
cc344fa1
AE
2214static struct rbd_img_request *rbd_img_request_create(
2215 struct rbd_device *rbd_dev,
bf0d5f50 2216 u64 offset, u64 length,
6d2940c8 2217 enum obj_operation_type op_type,
4e752f0a 2218 struct ceph_snap_context *snapc)
bf0d5f50
AE
2219{
2220 struct rbd_img_request *img_request;
bf0d5f50 2221
7a716aac 2222 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2223 if (!img_request)
2224 return NULL;
2225
bf0d5f50
AE
2226 img_request->rq = NULL;
2227 img_request->rbd_dev = rbd_dev;
2228 img_request->offset = offset;
2229 img_request->length = length;
0c425248 2230 img_request->flags = 0;
90e98c52
GZ
2231 if (op_type == OBJ_OP_DISCARD) {
2232 img_request_discard_set(img_request);
2233 img_request->snapc = snapc;
2234 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2235 img_request_write_set(img_request);
4e752f0a 2236 img_request->snapc = snapc;
0c425248 2237 } else {
bf0d5f50 2238 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2239 }
a2acd00e 2240 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2241 img_request_layered_set(img_request);
bf0d5f50
AE
2242 spin_lock_init(&img_request->completion_lock);
2243 img_request->next_completion = 0;
2244 img_request->callback = NULL;
a5a337d4 2245 img_request->result = 0;
bf0d5f50
AE
2246 img_request->obj_request_count = 0;
2247 INIT_LIST_HEAD(&img_request->obj_requests);
2248 kref_init(&img_request->kref);
2249
37206ee5 2250 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2251 obj_op_name(op_type), offset, length, img_request);
37206ee5 2252
bf0d5f50
AE
2253 return img_request;
2254}
2255
2256static void rbd_img_request_destroy(struct kref *kref)
2257{
2258 struct rbd_img_request *img_request;
2259 struct rbd_obj_request *obj_request;
2260 struct rbd_obj_request *next_obj_request;
2261
2262 img_request = container_of(kref, struct rbd_img_request, kref);
2263
37206ee5
AE
2264 dout("%s: img %p\n", __func__, img_request);
2265
bf0d5f50
AE
2266 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2267 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2268 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2269
a2acd00e
AE
2270 if (img_request_layered_test(img_request)) {
2271 img_request_layered_clear(img_request);
2272 rbd_dev_parent_put(img_request->rbd_dev);
2273 }
2274
bef95455
JD
2275 if (img_request_write_test(img_request) ||
2276 img_request_discard_test(img_request))
812164f8 2277 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2278
1c2a9dfe 2279 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2280}
2281
e93f3152
AE
2282static struct rbd_img_request *rbd_parent_request_create(
2283 struct rbd_obj_request *obj_request,
2284 u64 img_offset, u64 length)
2285{
2286 struct rbd_img_request *parent_request;
2287 struct rbd_device *rbd_dev;
2288
2289 rbd_assert(obj_request->img_request);
2290 rbd_dev = obj_request->img_request->rbd_dev;
2291
4e752f0a 2292 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2293 length, OBJ_OP_READ, NULL);
e93f3152
AE
2294 if (!parent_request)
2295 return NULL;
2296
2297 img_request_child_set(parent_request);
2298 rbd_obj_request_get(obj_request);
2299 parent_request->obj_request = obj_request;
2300
2301 return parent_request;
2302}
2303
2304static void rbd_parent_request_destroy(struct kref *kref)
2305{
2306 struct rbd_img_request *parent_request;
2307 struct rbd_obj_request *orig_request;
2308
2309 parent_request = container_of(kref, struct rbd_img_request, kref);
2310 orig_request = parent_request->obj_request;
2311
2312 parent_request->obj_request = NULL;
2313 rbd_obj_request_put(orig_request);
2314 img_request_child_clear(parent_request);
2315
2316 rbd_img_request_destroy(kref);
2317}
2318
1217857f
AE
2319static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2320{
6365d33a 2321 struct rbd_img_request *img_request;
1217857f
AE
2322 unsigned int xferred;
2323 int result;
8b3e1a56 2324 bool more;
1217857f 2325
6365d33a
AE
2326 rbd_assert(obj_request_img_data_test(obj_request));
2327 img_request = obj_request->img_request;
2328
1217857f
AE
2329 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2330 xferred = (unsigned int)obj_request->xferred;
2331 result = obj_request->result;
2332 if (result) {
2333 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2334 enum obj_operation_type op_type;
2335
90e98c52
GZ
2336 if (img_request_discard_test(img_request))
2337 op_type = OBJ_OP_DISCARD;
2338 else if (img_request_write_test(img_request))
2339 op_type = OBJ_OP_WRITE;
2340 else
2341 op_type = OBJ_OP_READ;
1217857f 2342
9584d508 2343 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2344 obj_op_name(op_type), obj_request->length,
2345 obj_request->img_offset, obj_request->offset);
9584d508 2346 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2347 result, xferred);
2348 if (!img_request->result)
2349 img_request->result = result;
082a75da
ID
2350 /*
2351 * Need to end I/O on the entire obj_request worth of
2352 * bytes in case of error.
2353 */
2354 xferred = obj_request->length;
1217857f
AE
2355 }
2356
f1a4739f
AE
2357 /* Image object requests don't own their page array */
2358
2359 if (obj_request->type == OBJ_REQUEST_PAGES) {
2360 obj_request->pages = NULL;
2361 obj_request->page_count = 0;
2362 }
2363
8b3e1a56
AE
2364 if (img_request_child_test(img_request)) {
2365 rbd_assert(img_request->obj_request != NULL);
2366 more = obj_request->which < img_request->obj_request_count - 1;
2367 } else {
2368 rbd_assert(img_request->rq != NULL);
7ad18afa
CH
2369
2370 more = blk_update_request(img_request->rq, result, xferred);
2371 if (!more)
2372 __blk_mq_end_request(img_request->rq, result);
8b3e1a56
AE
2373 }
2374
2375 return more;
1217857f
AE
2376}
2377
2169238d
AE
2378static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2379{
2380 struct rbd_img_request *img_request;
2381 u32 which = obj_request->which;
2382 bool more = true;
2383
6365d33a 2384 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2385 img_request = obj_request->img_request;
2386
2387 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2388 rbd_assert(img_request != NULL);
2169238d
AE
2389 rbd_assert(img_request->obj_request_count > 0);
2390 rbd_assert(which != BAD_WHICH);
2391 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2392
2393 spin_lock_irq(&img_request->completion_lock);
2394 if (which != img_request->next_completion)
2395 goto out;
2396
2397 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2398 rbd_assert(more);
2399 rbd_assert(which < img_request->obj_request_count);
2400
2401 if (!obj_request_done_test(obj_request))
2402 break;
1217857f 2403 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2404 which++;
2405 }
2406
2407 rbd_assert(more ^ (which == img_request->obj_request_count));
2408 img_request->next_completion = which;
2409out:
2410 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2411 rbd_img_request_put(img_request);
2169238d
AE
2412
2413 if (!more)
2414 rbd_img_request_complete(img_request);
2415}
2416
3b434a2a
JD
2417/*
2418 * Add individual osd ops to the given ceph_osd_request and prepare
2419 * them for submission. num_ops is the current number of
2420 * osd operations already to the object request.
2421 */
2422static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2423 struct ceph_osd_request *osd_request,
2424 enum obj_operation_type op_type,
2425 unsigned int num_ops)
2426{
2427 struct rbd_img_request *img_request = obj_request->img_request;
2428 struct rbd_device *rbd_dev = img_request->rbd_dev;
2429 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2430 u64 offset = obj_request->offset;
2431 u64 length = obj_request->length;
2432 u64 img_end;
2433 u16 opcode;
2434
2435 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2436 if (!offset && length == object_size &&
2437 (!img_request_layered_test(img_request) ||
2438 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2439 opcode = CEPH_OSD_OP_DELETE;
2440 } else if ((offset + length == object_size)) {
2441 opcode = CEPH_OSD_OP_TRUNCATE;
2442 } else {
2443 down_read(&rbd_dev->header_rwsem);
2444 img_end = rbd_dev->header.image_size;
2445 up_read(&rbd_dev->header_rwsem);
2446
2447 if (obj_request->img_offset + length == img_end)
2448 opcode = CEPH_OSD_OP_TRUNCATE;
2449 else
2450 opcode = CEPH_OSD_OP_ZERO;
2451 }
2452 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2453 if (!offset && length == object_size)
2454 opcode = CEPH_OSD_OP_WRITEFULL;
2455 else
2456 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2457 osd_req_op_alloc_hint_init(osd_request, num_ops,
2458 object_size, object_size);
2459 num_ops++;
2460 } else {
2461 opcode = CEPH_OSD_OP_READ;
2462 }
2463
7e868b6e 2464 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2465 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2466 else
2467 osd_req_op_extent_init(osd_request, num_ops, opcode,
2468 offset, length, 0, 0);
2469
3b434a2a
JD
2470 if (obj_request->type == OBJ_REQUEST_BIO)
2471 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2472 obj_request->bio_list, length);
2473 else if (obj_request->type == OBJ_REQUEST_PAGES)
2474 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2475 obj_request->pages, length,
2476 offset & ~PAGE_MASK, false, false);
2477
2478 /* Discards are also writes */
2479 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2480 rbd_osd_req_format_write(obj_request);
2481 else
2482 rbd_osd_req_format_read(obj_request);
2483}
2484
f1a4739f
AE
2485/*
2486 * Split up an image request into one or more object requests, each
2487 * to a different object. The "type" parameter indicates whether
2488 * "data_desc" is the pointer to the head of a list of bio
2489 * structures, or the base of a page array. In either case this
2490 * function assumes data_desc describes memory sufficient to hold
2491 * all data described by the image request.
2492 */
2493static int rbd_img_request_fill(struct rbd_img_request *img_request,
2494 enum obj_request_type type,
2495 void *data_desc)
bf0d5f50
AE
2496{
2497 struct rbd_device *rbd_dev = img_request->rbd_dev;
2498 struct rbd_obj_request *obj_request = NULL;
2499 struct rbd_obj_request *next_obj_request;
a158073c 2500 struct bio *bio_list = NULL;
f1a4739f 2501 unsigned int bio_offset = 0;
a158073c 2502 struct page **pages = NULL;
6d2940c8 2503 enum obj_operation_type op_type;
7da22d29 2504 u64 img_offset;
bf0d5f50 2505 u64 resid;
bf0d5f50 2506
f1a4739f
AE
2507 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2508 (int)type, data_desc);
37206ee5 2509
7da22d29 2510 img_offset = img_request->offset;
bf0d5f50 2511 resid = img_request->length;
4dda41d3 2512 rbd_assert(resid > 0);
3b434a2a 2513 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2514
2515 if (type == OBJ_REQUEST_BIO) {
2516 bio_list = data_desc;
4f024f37
KO
2517 rbd_assert(img_offset ==
2518 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2519 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2520 pages = data_desc;
2521 }
2522
bf0d5f50 2523 while (resid) {
2fa12320 2524 struct ceph_osd_request *osd_req;
bf0d5f50 2525 const char *object_name;
bf0d5f50
AE
2526 u64 offset;
2527 u64 length;
2528
7da22d29 2529 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2530 if (!object_name)
2531 goto out_unwind;
7da22d29
AE
2532 offset = rbd_segment_offset(rbd_dev, img_offset);
2533 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2534 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2535 offset, length, type);
78c2a44a
AE
2536 /* object request has its own copy of the object name */
2537 rbd_segment_name_free(object_name);
bf0d5f50
AE
2538 if (!obj_request)
2539 goto out_unwind;
62054da6 2540
03507db6
JD
2541 /*
2542 * set obj_request->img_request before creating the
2543 * osd_request so that it gets the right snapc
2544 */
2545 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2546
f1a4739f
AE
2547 if (type == OBJ_REQUEST_BIO) {
2548 unsigned int clone_size;
2549
2550 rbd_assert(length <= (u64)UINT_MAX);
2551 clone_size = (unsigned int)length;
2552 obj_request->bio_list =
2553 bio_chain_clone_range(&bio_list,
2554 &bio_offset,
2555 clone_size,
2224d879 2556 GFP_NOIO);
f1a4739f 2557 if (!obj_request->bio_list)
62054da6 2558 goto out_unwind;
90e98c52 2559 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2560 unsigned int page_count;
2561
2562 obj_request->pages = pages;
2563 page_count = (u32)calc_pages_for(offset, length);
2564 obj_request->page_count = page_count;
2565 if ((offset + length) & ~PAGE_MASK)
2566 page_count--; /* more on last page */
2567 pages += page_count;
2568 }
bf0d5f50 2569
6d2940c8
GZ
2570 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2571 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2572 obj_request);
2fa12320 2573 if (!osd_req)
62054da6 2574 goto out_unwind;
3b434a2a 2575
2fa12320 2576 obj_request->osd_req = osd_req;
2169238d 2577 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2578 obj_request->img_offset = img_offset;
9d4df01f 2579
3b434a2a 2580 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2581
3b434a2a 2582 rbd_img_request_get(img_request);
bf0d5f50 2583
7da22d29 2584 img_offset += length;
bf0d5f50
AE
2585 resid -= length;
2586 }
2587
2588 return 0;
2589
bf0d5f50
AE
2590out_unwind:
2591 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2592 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2593
2594 return -ENOMEM;
2595}
2596
0eefd470 2597static void
2761713d 2598rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2599{
2600 struct rbd_img_request *img_request;
2601 struct rbd_device *rbd_dev;
ebda6408 2602 struct page **pages;
0eefd470
AE
2603 u32 page_count;
2604
2761713d
ID
2605 dout("%s: obj %p\n", __func__, obj_request);
2606
d3246fb0
JD
2607 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2608 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2609 rbd_assert(obj_request_img_data_test(obj_request));
2610 img_request = obj_request->img_request;
2611 rbd_assert(img_request);
2612
2613 rbd_dev = img_request->rbd_dev;
2614 rbd_assert(rbd_dev);
0eefd470 2615
ebda6408
AE
2616 pages = obj_request->copyup_pages;
2617 rbd_assert(pages != NULL);
0eefd470 2618 obj_request->copyup_pages = NULL;
ebda6408
AE
2619 page_count = obj_request->copyup_page_count;
2620 rbd_assert(page_count);
2621 obj_request->copyup_page_count = 0;
2622 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2623
2624 /*
2625 * We want the transfer count to reflect the size of the
2626 * original write request. There is no such thing as a
2627 * successful short write, so if the request was successful
2628 * we can just set it to the originally-requested length.
2629 */
2630 if (!obj_request->result)
2631 obj_request->xferred = obj_request->length;
2632
2761713d 2633 obj_request_done_set(obj_request);
0eefd470
AE
2634}
2635
3d7efd18
AE
2636static void
2637rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2638{
2639 struct rbd_obj_request *orig_request;
0eefd470
AE
2640 struct ceph_osd_request *osd_req;
2641 struct ceph_osd_client *osdc;
2642 struct rbd_device *rbd_dev;
3d7efd18 2643 struct page **pages;
d3246fb0 2644 enum obj_operation_type op_type;
ebda6408 2645 u32 page_count;
bbea1c1a 2646 int img_result;
ebda6408 2647 u64 parent_length;
3d7efd18
AE
2648
2649 rbd_assert(img_request_child_test(img_request));
2650
2651 /* First get what we need from the image request */
2652
2653 pages = img_request->copyup_pages;
2654 rbd_assert(pages != NULL);
2655 img_request->copyup_pages = NULL;
ebda6408
AE
2656 page_count = img_request->copyup_page_count;
2657 rbd_assert(page_count);
2658 img_request->copyup_page_count = 0;
3d7efd18
AE
2659
2660 orig_request = img_request->obj_request;
2661 rbd_assert(orig_request != NULL);
b91f09f1 2662 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2663 img_result = img_request->result;
ebda6408
AE
2664 parent_length = img_request->length;
2665 rbd_assert(parent_length == img_request->xferred);
91c6febb 2666 rbd_img_request_put(img_request);
3d7efd18 2667
91c6febb
AE
2668 rbd_assert(orig_request->img_request);
2669 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2670 rbd_assert(rbd_dev);
0eefd470 2671
bbea1c1a
AE
2672 /*
2673 * If the overlap has become 0 (most likely because the
2674 * image has been flattened) we need to free the pages
2675 * and re-submit the original write request.
2676 */
2677 if (!rbd_dev->parent_overlap) {
2678 struct ceph_osd_client *osdc;
3d7efd18 2679
bbea1c1a
AE
2680 ceph_release_page_vector(pages, page_count);
2681 osdc = &rbd_dev->rbd_client->client->osdc;
2682 img_result = rbd_obj_request_submit(osdc, orig_request);
2683 if (!img_result)
2684 return;
2685 }
0eefd470 2686
bbea1c1a 2687 if (img_result)
0eefd470 2688 goto out_err;
0eefd470 2689
8785b1d4
AE
2690 /*
2691 * The original osd request is of no use to use any more.
0ccd5926 2692 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2693 * request. Allocate the new copyup osd request for the
2694 * original request, and release the old one.
2695 */
bbea1c1a 2696 img_result = -ENOMEM;
0eefd470
AE
2697 osd_req = rbd_osd_req_create_copyup(orig_request);
2698 if (!osd_req)
2699 goto out_err;
8785b1d4 2700 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2701 orig_request->osd_req = osd_req;
2702 orig_request->copyup_pages = pages;
ebda6408 2703 orig_request->copyup_page_count = page_count;
3d7efd18 2704
0eefd470 2705 /* Initialize the copyup op */
3d7efd18 2706
0eefd470 2707 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2708 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2709 false, false);
3d7efd18 2710
d3246fb0 2711 /* Add the other op(s) */
0eefd470 2712
d3246fb0
JD
2713 op_type = rbd_img_request_op_type(orig_request->img_request);
2714 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2715
2716 /* All set, send it off. */
2717
0eefd470 2718 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2719 img_result = rbd_obj_request_submit(osdc, orig_request);
2720 if (!img_result)
0eefd470
AE
2721 return;
2722out_err:
2723 /* Record the error code and complete the request */
2724
bbea1c1a 2725 orig_request->result = img_result;
0eefd470
AE
2726 orig_request->xferred = 0;
2727 obj_request_done_set(orig_request);
2728 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2729}
2730
2731/*
2732 * Read from the parent image the range of data that covers the
2733 * entire target of the given object request. This is used for
2734 * satisfying a layered image write request when the target of an
2735 * object request from the image request does not exist.
2736 *
2737 * A page array big enough to hold the returned data is allocated
2738 * and supplied to rbd_img_request_fill() as the "data descriptor."
2739 * When the read completes, this page array will be transferred to
2740 * the original object request for the copyup operation.
2741 *
2742 * If an error occurs, record it as the result of the original
2743 * object request and mark it done so it gets completed.
2744 */
2745static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2746{
2747 struct rbd_img_request *img_request = NULL;
2748 struct rbd_img_request *parent_request = NULL;
2749 struct rbd_device *rbd_dev;
2750 u64 img_offset;
2751 u64 length;
2752 struct page **pages = NULL;
2753 u32 page_count;
2754 int result;
2755
2756 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2757 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2758
2759 img_request = obj_request->img_request;
2760 rbd_assert(img_request != NULL);
2761 rbd_dev = img_request->rbd_dev;
2762 rbd_assert(rbd_dev->parent != NULL);
2763
2764 /*
2765 * Determine the byte range covered by the object in the
2766 * child image to which the original request was to be sent.
2767 */
2768 img_offset = obj_request->img_offset - obj_request->offset;
2769 length = (u64)1 << rbd_dev->header.obj_order;
2770
a9e8ba2c
AE
2771 /*
2772 * There is no defined parent data beyond the parent
2773 * overlap, so limit what we read at that boundary if
2774 * necessary.
2775 */
2776 if (img_offset + length > rbd_dev->parent_overlap) {
2777 rbd_assert(img_offset < rbd_dev->parent_overlap);
2778 length = rbd_dev->parent_overlap - img_offset;
2779 }
2780
3d7efd18
AE
2781 /*
2782 * Allocate a page array big enough to receive the data read
2783 * from the parent.
2784 */
2785 page_count = (u32)calc_pages_for(0, length);
2786 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2787 if (IS_ERR(pages)) {
2788 result = PTR_ERR(pages);
2789 pages = NULL;
2790 goto out_err;
2791 }
2792
2793 result = -ENOMEM;
e93f3152
AE
2794 parent_request = rbd_parent_request_create(obj_request,
2795 img_offset, length);
3d7efd18
AE
2796 if (!parent_request)
2797 goto out_err;
3d7efd18
AE
2798
2799 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2800 if (result)
2801 goto out_err;
2802 parent_request->copyup_pages = pages;
ebda6408 2803 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2804
2805 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2806 result = rbd_img_request_submit(parent_request);
2807 if (!result)
2808 return 0;
2809
2810 parent_request->copyup_pages = NULL;
ebda6408 2811 parent_request->copyup_page_count = 0;
3d7efd18
AE
2812 parent_request->obj_request = NULL;
2813 rbd_obj_request_put(obj_request);
2814out_err:
2815 if (pages)
2816 ceph_release_page_vector(pages, page_count);
2817 if (parent_request)
2818 rbd_img_request_put(parent_request);
2819 obj_request->result = result;
2820 obj_request->xferred = 0;
2821 obj_request_done_set(obj_request);
2822
2823 return result;
2824}
2825
c5b5ef6c
AE
2826static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2827{
c5b5ef6c 2828 struct rbd_obj_request *orig_request;
638f5abe 2829 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2830 int result;
2831
2832 rbd_assert(!obj_request_img_data_test(obj_request));
2833
2834 /*
2835 * All we need from the object request is the original
2836 * request and the result of the STAT op. Grab those, then
2837 * we're done with the request.
2838 */
2839 orig_request = obj_request->obj_request;
2840 obj_request->obj_request = NULL;
912c317d 2841 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2842 rbd_assert(orig_request);
2843 rbd_assert(orig_request->img_request);
2844
2845 result = obj_request->result;
2846 obj_request->result = 0;
2847
2848 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2849 obj_request, orig_request, result,
2850 obj_request->xferred, obj_request->length);
2851 rbd_obj_request_put(obj_request);
2852
638f5abe
AE
2853 /*
2854 * If the overlap has become 0 (most likely because the
2855 * image has been flattened) we need to free the pages
2856 * and re-submit the original write request.
2857 */
2858 rbd_dev = orig_request->img_request->rbd_dev;
2859 if (!rbd_dev->parent_overlap) {
2860 struct ceph_osd_client *osdc;
2861
638f5abe
AE
2862 osdc = &rbd_dev->rbd_client->client->osdc;
2863 result = rbd_obj_request_submit(osdc, orig_request);
2864 if (!result)
2865 return;
2866 }
c5b5ef6c
AE
2867
2868 /*
2869 * Our only purpose here is to determine whether the object
2870 * exists, and we don't want to treat the non-existence as
2871 * an error. If something else comes back, transfer the
2872 * error to the original request and complete it now.
2873 */
2874 if (!result) {
2875 obj_request_existence_set(orig_request, true);
2876 } else if (result == -ENOENT) {
2877 obj_request_existence_set(orig_request, false);
2878 } else if (result) {
2879 orig_request->result = result;
3d7efd18 2880 goto out;
c5b5ef6c
AE
2881 }
2882
2883 /*
2884 * Resubmit the original request now that we have recorded
2885 * whether the target object exists.
2886 */
b454e36d 2887 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2888out:
c5b5ef6c
AE
2889 if (orig_request->result)
2890 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2891}
2892
2893static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2894{
2895 struct rbd_obj_request *stat_request;
2896 struct rbd_device *rbd_dev;
2897 struct ceph_osd_client *osdc;
2898 struct page **pages = NULL;
2899 u32 page_count;
2900 size_t size;
2901 int ret;
2902
2903 /*
2904 * The response data for a STAT call consists of:
2905 * le64 length;
2906 * struct {
2907 * le32 tv_sec;
2908 * le32 tv_nsec;
2909 * } mtime;
2910 */
2911 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2912 page_count = (u32)calc_pages_for(0, size);
2913 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2914 if (IS_ERR(pages))
2915 return PTR_ERR(pages);
2916
2917 ret = -ENOMEM;
2918 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2919 OBJ_REQUEST_PAGES);
2920 if (!stat_request)
2921 goto out;
2922
2923 rbd_obj_request_get(obj_request);
2924 stat_request->obj_request = obj_request;
2925 stat_request->pages = pages;
2926 stat_request->page_count = page_count;
2927
2928 rbd_assert(obj_request->img_request);
2929 rbd_dev = obj_request->img_request->rbd_dev;
6d2940c8 2930 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 2931 stat_request);
c5b5ef6c
AE
2932 if (!stat_request->osd_req)
2933 goto out;
2934 stat_request->callback = rbd_img_obj_exists_callback;
2935
144cba14 2936 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
c5b5ef6c
AE
2937 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2938 false, false);
9d4df01f 2939 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2940
2941 osdc = &rbd_dev->rbd_client->client->osdc;
2942 ret = rbd_obj_request_submit(osdc, stat_request);
2943out:
2944 if (ret)
2945 rbd_obj_request_put(obj_request);
2946
2947 return ret;
2948}
2949
70d045f6 2950static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d
AE
2951{
2952 struct rbd_img_request *img_request;
a9e8ba2c 2953 struct rbd_device *rbd_dev;
b454e36d
AE
2954
2955 rbd_assert(obj_request_img_data_test(obj_request));
2956
2957 img_request = obj_request->img_request;
2958 rbd_assert(img_request);
a9e8ba2c 2959 rbd_dev = img_request->rbd_dev;
b454e36d 2960
70d045f6 2961 /* Reads */
1c220881
JD
2962 if (!img_request_write_test(img_request) &&
2963 !img_request_discard_test(img_request))
70d045f6
ID
2964 return true;
2965
2966 /* Non-layered writes */
2967 if (!img_request_layered_test(img_request))
2968 return true;
2969
b454e36d 2970 /*
70d045f6
ID
2971 * Layered writes outside of the parent overlap range don't
2972 * share any data with the parent.
b454e36d 2973 */
70d045f6
ID
2974 if (!obj_request_overlaps_parent(obj_request))
2975 return true;
b454e36d 2976
c622d226
GZ
2977 /*
2978 * Entire-object layered writes - we will overwrite whatever
2979 * parent data there is anyway.
2980 */
2981 if (!obj_request->offset &&
2982 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2983 return true;
2984
70d045f6
ID
2985 /*
2986 * If the object is known to already exist, its parent data has
2987 * already been copied.
2988 */
2989 if (obj_request_known_test(obj_request) &&
2990 obj_request_exists_test(obj_request))
2991 return true;
2992
2993 return false;
2994}
2995
2996static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2997{
2998 if (img_obj_request_simple(obj_request)) {
b454e36d
AE
2999 struct rbd_device *rbd_dev;
3000 struct ceph_osd_client *osdc;
3001
3002 rbd_dev = obj_request->img_request->rbd_dev;
3003 osdc = &rbd_dev->rbd_client->client->osdc;
3004
3005 return rbd_obj_request_submit(osdc, obj_request);
3006 }
3007
3008 /*
3d7efd18
AE
3009 * It's a layered write. The target object might exist but
3010 * we may not know that yet. If we know it doesn't exist,
3011 * start by reading the data for the full target object from
3012 * the parent so we can use it for a copyup to the target.
b454e36d 3013 */
70d045f6 3014 if (obj_request_known_test(obj_request))
3d7efd18
AE
3015 return rbd_img_obj_parent_read_full(obj_request);
3016
3017 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
3018
3019 return rbd_img_obj_exists_submit(obj_request);
3020}
3021
bf0d5f50
AE
3022static int rbd_img_request_submit(struct rbd_img_request *img_request)
3023{
bf0d5f50 3024 struct rbd_obj_request *obj_request;
46faeed4 3025 struct rbd_obj_request *next_obj_request;
663ae2cc 3026 int ret = 0;
bf0d5f50 3027
37206ee5 3028 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 3029
663ae2cc
ID
3030 rbd_img_request_get(img_request);
3031 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 3032 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 3033 if (ret)
663ae2cc 3034 goto out_put_ireq;
bf0d5f50
AE
3035 }
3036
663ae2cc
ID
3037out_put_ireq:
3038 rbd_img_request_put(img_request);
3039 return ret;
bf0d5f50 3040}
8b3e1a56
AE
3041
3042static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
3043{
3044 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
3045 struct rbd_device *rbd_dev;
3046 u64 obj_end;
02c74fba
AE
3047 u64 img_xferred;
3048 int img_result;
8b3e1a56
AE
3049
3050 rbd_assert(img_request_child_test(img_request));
3051
02c74fba
AE
3052 /* First get what we need from the image request and release it */
3053
8b3e1a56 3054 obj_request = img_request->obj_request;
02c74fba
AE
3055 img_xferred = img_request->xferred;
3056 img_result = img_request->result;
3057 rbd_img_request_put(img_request);
3058
3059 /*
3060 * If the overlap has become 0 (most likely because the
3061 * image has been flattened) we need to re-submit the
3062 * original request.
3063 */
a9e8ba2c
AE
3064 rbd_assert(obj_request);
3065 rbd_assert(obj_request->img_request);
02c74fba
AE
3066 rbd_dev = obj_request->img_request->rbd_dev;
3067 if (!rbd_dev->parent_overlap) {
3068 struct ceph_osd_client *osdc;
3069
3070 osdc = &rbd_dev->rbd_client->client->osdc;
3071 img_result = rbd_obj_request_submit(osdc, obj_request);
3072 if (!img_result)
3073 return;
3074 }
a9e8ba2c 3075
02c74fba 3076 obj_request->result = img_result;
a9e8ba2c
AE
3077 if (obj_request->result)
3078 goto out;
3079
3080 /*
3081 * We need to zero anything beyond the parent overlap
3082 * boundary. Since rbd_img_obj_request_read_callback()
3083 * will zero anything beyond the end of a short read, an
3084 * easy way to do this is to pretend the data from the
3085 * parent came up short--ending at the overlap boundary.
3086 */
3087 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
3088 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
3089 if (obj_end > rbd_dev->parent_overlap) {
3090 u64 xferred = 0;
3091
3092 if (obj_request->img_offset < rbd_dev->parent_overlap)
3093 xferred = rbd_dev->parent_overlap -
3094 obj_request->img_offset;
8b3e1a56 3095
02c74fba 3096 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 3097 } else {
02c74fba 3098 obj_request->xferred = img_xferred;
a9e8ba2c
AE
3099 }
3100out:
8b3e1a56
AE
3101 rbd_img_obj_request_read_callback(obj_request);
3102 rbd_obj_request_complete(obj_request);
3103}
3104
3105static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3106{
8b3e1a56
AE
3107 struct rbd_img_request *img_request;
3108 int result;
3109
3110 rbd_assert(obj_request_img_data_test(obj_request));
3111 rbd_assert(obj_request->img_request != NULL);
3112 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 3113 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3114
8b3e1a56 3115 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3116 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3117 obj_request->img_offset,
e93f3152 3118 obj_request->length);
8b3e1a56
AE
3119 result = -ENOMEM;
3120 if (!img_request)
3121 goto out_err;
3122
5b2ab72d
AE
3123 if (obj_request->type == OBJ_REQUEST_BIO)
3124 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3125 obj_request->bio_list);
3126 else
3127 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3128 obj_request->pages);
8b3e1a56
AE
3129 if (result)
3130 goto out_err;
3131
3132 img_request->callback = rbd_img_parent_read_callback;
3133 result = rbd_img_request_submit(img_request);
3134 if (result)
3135 goto out_err;
3136
3137 return;
3138out_err:
3139 if (img_request)
3140 rbd_img_request_put(img_request);
3141 obj_request->result = result;
3142 obj_request->xferred = 0;
3143 obj_request_done_set(obj_request);
3144}
bf0d5f50 3145
ed95b21a
ID
3146static const struct rbd_client_id rbd_empty_cid;
3147
3148static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3149 const struct rbd_client_id *rhs)
3150{
3151 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3152}
3153
3154static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3155{
3156 struct rbd_client_id cid;
3157
3158 mutex_lock(&rbd_dev->watch_mutex);
3159 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3160 cid.handle = rbd_dev->watch_cookie;
3161 mutex_unlock(&rbd_dev->watch_mutex);
3162 return cid;
3163}
3164
3165/*
3166 * lock_rwsem must be held for write
3167 */
3168static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3169 const struct rbd_client_id *cid)
3170{
3171 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3172 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3173 cid->gid, cid->handle);
3174 rbd_dev->owner_cid = *cid; /* struct */
3175}
3176
3177static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3178{
3179 mutex_lock(&rbd_dev->watch_mutex);
3180 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3181 mutex_unlock(&rbd_dev->watch_mutex);
3182}
3183
3184/*
3185 * lock_rwsem must be held for write
3186 */
3187static int rbd_lock(struct rbd_device *rbd_dev)
3188{
3189 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3190 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3191 char cookie[32];
3192 int ret;
3193
3194 WARN_ON(__rbd_is_lock_owner(rbd_dev));
3195
3196 format_lock_cookie(rbd_dev, cookie);
3197 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3198 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3199 RBD_LOCK_TAG, "", 0);
3200 if (ret)
3201 return ret;
3202
3203 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3204 rbd_set_owner_cid(rbd_dev, &cid);
3205 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3206 return 0;
3207}
3208
3209/*
3210 * lock_rwsem must be held for write
3211 */
3212static int rbd_unlock(struct rbd_device *rbd_dev)
b8d70035 3213{
922dab61 3214 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 3215 char cookie[32];
e627db08 3216 int ret;
b8d70035 3217
ed95b21a
ID
3218 WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3219
3220 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3221
3222 format_lock_cookie(rbd_dev, cookie);
3223 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3224 RBD_LOCK_NAME, cookie);
3225 if (ret && ret != -ENOENT) {
3226 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3227 return ret;
3228 }
3229
3230 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3231 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3232 return 0;
3233}
3234
3235static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3236 enum rbd_notify_op notify_op,
3237 struct page ***preply_pages,
3238 size_t *preply_len)
3239{
3240 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3241 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3242 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3243 char buf[buf_size];
3244 void *p = buf;
3245
3246 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3247
3248 /* encode *LockPayload NotifyMessage (op + ClientId) */
3249 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3250 ceph_encode_32(&p, notify_op);
3251 ceph_encode_64(&p, cid.gid);
3252 ceph_encode_64(&p, cid.handle);
3253
3254 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3255 &rbd_dev->header_oloc, buf, buf_size,
3256 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3257}
3258
3259static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3260 enum rbd_notify_op notify_op)
3261{
3262 struct page **reply_pages;
3263 size_t reply_len;
3264
3265 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3266 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3267}
3268
3269static void rbd_notify_acquired_lock(struct work_struct *work)
3270{
3271 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3272 acquired_lock_work);
3273
3274 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3275}
3276
3277static void rbd_notify_released_lock(struct work_struct *work)
3278{
3279 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3280 released_lock_work);
3281
3282 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3283}
3284
3285static int rbd_request_lock(struct rbd_device *rbd_dev)
3286{
3287 struct page **reply_pages;
3288 size_t reply_len;
3289 bool lock_owner_responded = false;
3290 int ret;
52bb1f9b 3291
ed95b21a
ID
3292 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3293
3294 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3295 &reply_pages, &reply_len);
3296 if (ret && ret != -ETIMEDOUT) {
3297 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3298 goto out;
3299 }
3300
3301 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3302 void *p = page_address(reply_pages[0]);
3303 void *const end = p + reply_len;
3304 u32 n;
3305
3306 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3307 while (n--) {
3308 u8 struct_v;
3309 u32 len;
3310
3311 ceph_decode_need(&p, end, 8 + 8, e_inval);
3312 p += 8 + 8; /* skip gid and cookie */
3313
3314 ceph_decode_32_safe(&p, end, len, e_inval);
3315 if (!len)
3316 continue;
3317
3318 if (lock_owner_responded) {
3319 rbd_warn(rbd_dev,
3320 "duplicate lock owners detected");
3321 ret = -EIO;
3322 goto out;
3323 }
3324
3325 lock_owner_responded = true;
3326 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3327 &struct_v, &len);
3328 if (ret) {
3329 rbd_warn(rbd_dev,
3330 "failed to decode ResponseMessage: %d",
3331 ret);
3332 goto e_inval;
3333 }
3334
3335 ret = ceph_decode_32(&p);
3336 }
3337 }
3338
3339 if (!lock_owner_responded) {
3340 rbd_warn(rbd_dev, "no lock owners detected");
3341 ret = -ETIMEDOUT;
3342 }
3343
3344out:
3345 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3346 return ret;
3347
3348e_inval:
3349 ret = -EINVAL;
3350 goto out;
3351}
3352
3353static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3354{
3355 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3356
3357 cancel_delayed_work(&rbd_dev->lock_dwork);
3358 if (wake_all)
3359 wake_up_all(&rbd_dev->lock_waitq);
3360 else
3361 wake_up(&rbd_dev->lock_waitq);
3362}
3363
3364static int get_lock_owner_info(struct rbd_device *rbd_dev,
3365 struct ceph_locker **lockers, u32 *num_lockers)
3366{
3367 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3368 u8 lock_type;
3369 char *lock_tag;
3370 int ret;
3371
3372 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3373
3374 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3375 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3376 &lock_type, &lock_tag, lockers, num_lockers);
3377 if (ret)
3378 return ret;
3379
3380 if (*num_lockers == 0) {
3381 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3382 goto out;
3383 }
3384
3385 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3386 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3387 lock_tag);
3388 ret = -EBUSY;
3389 goto out;
3390 }
3391
3392 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3393 rbd_warn(rbd_dev, "shared lock type detected");
3394 ret = -EBUSY;
3395 goto out;
3396 }
3397
3398 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3399 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3400 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3401 (*lockers)[0].id.cookie);
3402 ret = -EBUSY;
3403 goto out;
3404 }
3405
3406out:
3407 kfree(lock_tag);
3408 return ret;
3409}
3410
3411static int find_watcher(struct rbd_device *rbd_dev,
3412 const struct ceph_locker *locker)
3413{
3414 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3415 struct ceph_watch_item *watchers;
3416 u32 num_watchers;
3417 u64 cookie;
3418 int i;
3419 int ret;
3420
3421 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3422 &rbd_dev->header_oloc, &watchers,
3423 &num_watchers);
3424 if (ret)
3425 return ret;
3426
3427 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3428 for (i = 0; i < num_watchers; i++) {
3429 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3430 sizeof(locker->info.addr)) &&
3431 watchers[i].cookie == cookie) {
3432 struct rbd_client_id cid = {
3433 .gid = le64_to_cpu(watchers[i].name.num),
3434 .handle = cookie,
3435 };
3436
3437 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3438 rbd_dev, cid.gid, cid.handle);
3439 rbd_set_owner_cid(rbd_dev, &cid);
3440 ret = 1;
3441 goto out;
3442 }
3443 }
3444
3445 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3446 ret = 0;
3447out:
3448 kfree(watchers);
3449 return ret;
3450}
3451
3452/*
3453 * lock_rwsem must be held for write
3454 */
3455static int rbd_try_lock(struct rbd_device *rbd_dev)
3456{
3457 struct ceph_client *client = rbd_dev->rbd_client->client;
3458 struct ceph_locker *lockers;
3459 u32 num_lockers;
3460 int ret;
3461
3462 for (;;) {
3463 ret = rbd_lock(rbd_dev);
3464 if (ret != -EBUSY)
3465 return ret;
3466
3467 /* determine if the current lock holder is still alive */
3468 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3469 if (ret)
3470 return ret;
3471
3472 if (num_lockers == 0)
3473 goto again;
3474
3475 ret = find_watcher(rbd_dev, lockers);
3476 if (ret) {
3477 if (ret > 0)
3478 ret = 0; /* have to request lock */
3479 goto out;
3480 }
3481
3482 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3483 ENTITY_NAME(lockers[0].id.name));
3484
3485 ret = ceph_monc_blacklist_add(&client->monc,
3486 &lockers[0].info.addr);
3487 if (ret) {
3488 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3489 ENTITY_NAME(lockers[0].id.name), ret);
3490 goto out;
3491 }
3492
3493 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3494 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3495 lockers[0].id.cookie,
3496 &lockers[0].id.name);
3497 if (ret && ret != -ENOENT)
3498 goto out;
3499
3500again:
3501 ceph_free_lockers(lockers, num_lockers);
3502 }
3503
3504out:
3505 ceph_free_lockers(lockers, num_lockers);
3506 return ret;
3507}
3508
3509/*
3510 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3511 */
3512static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3513 int *pret)
3514{
3515 enum rbd_lock_state lock_state;
3516
3517 down_read(&rbd_dev->lock_rwsem);
3518 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3519 rbd_dev->lock_state);
3520 if (__rbd_is_lock_owner(rbd_dev)) {
3521 lock_state = rbd_dev->lock_state;
3522 up_read(&rbd_dev->lock_rwsem);
3523 return lock_state;
3524 }
3525
3526 up_read(&rbd_dev->lock_rwsem);
3527 down_write(&rbd_dev->lock_rwsem);
3528 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3529 rbd_dev->lock_state);
3530 if (!__rbd_is_lock_owner(rbd_dev)) {
3531 *pret = rbd_try_lock(rbd_dev);
3532 if (*pret)
3533 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3534 }
3535
3536 lock_state = rbd_dev->lock_state;
3537 up_write(&rbd_dev->lock_rwsem);
3538 return lock_state;
3539}
3540
3541static void rbd_acquire_lock(struct work_struct *work)
3542{
3543 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3544 struct rbd_device, lock_dwork);
3545 enum rbd_lock_state lock_state;
3546 int ret;
3547
3548 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3549again:
3550 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3551 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3552 if (lock_state == RBD_LOCK_STATE_LOCKED)
3553 wake_requests(rbd_dev, true);
3554 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3555 rbd_dev, lock_state, ret);
3556 return;
3557 }
3558
3559 ret = rbd_request_lock(rbd_dev);
3560 if (ret == -ETIMEDOUT) {
3561 goto again; /* treat this as a dead client */
3562 } else if (ret < 0) {
3563 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3564 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3565 RBD_RETRY_DELAY);
3566 } else {
3567 /*
3568 * lock owner acked, but resend if we don't see them
3569 * release the lock
3570 */
3571 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3572 rbd_dev);
3573 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3574 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3575 }
3576}
3577
3578/*
3579 * lock_rwsem must be held for write
3580 */
3581static bool rbd_release_lock(struct rbd_device *rbd_dev)
3582{
3583 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3584 rbd_dev->lock_state);
3585 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3586 return false;
3587
3588 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3589 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3590 /*
ed95b21a 3591 * Ensure that all in-flight IO is flushed.
52bb1f9b 3592 *
ed95b21a
ID
3593 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3594 * may be shared with other devices.
52bb1f9b 3595 */
ed95b21a
ID
3596 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3597 up_read(&rbd_dev->lock_rwsem);
3598
3599 down_write(&rbd_dev->lock_rwsem);
3600 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3601 rbd_dev->lock_state);
3602 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3603 return false;
3604
3605 if (!rbd_unlock(rbd_dev))
3606 /*
3607 * Give others a chance to grab the lock - we would re-acquire
3608 * almost immediately if we got new IO during ceph_osdc_sync()
3609 * otherwise. We need to ack our own notifications, so this
3610 * lock_dwork will be requeued from rbd_wait_state_locked()
3611 * after wake_requests() in rbd_handle_released_lock().
3612 */
3613 cancel_delayed_work(&rbd_dev->lock_dwork);
3614
3615 return true;
3616}
3617
3618static void rbd_release_lock_work(struct work_struct *work)
3619{
3620 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3621 unlock_work);
3622
3623 down_write(&rbd_dev->lock_rwsem);
3624 rbd_release_lock(rbd_dev);
3625 up_write(&rbd_dev->lock_rwsem);
3626}
3627
3628static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3629 void **p)
3630{
3631 struct rbd_client_id cid = { 0 };
3632
3633 if (struct_v >= 2) {
3634 cid.gid = ceph_decode_64(p);
3635 cid.handle = ceph_decode_64(p);
3636 }
3637
3638 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3639 cid.handle);
3640 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3641 down_write(&rbd_dev->lock_rwsem);
3642 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3643 /*
3644 * we already know that the remote client is
3645 * the owner
3646 */
3647 up_write(&rbd_dev->lock_rwsem);
3648 return;
3649 }
3650
3651 rbd_set_owner_cid(rbd_dev, &cid);
3652 downgrade_write(&rbd_dev->lock_rwsem);
3653 } else {
3654 down_read(&rbd_dev->lock_rwsem);
3655 }
3656
3657 if (!__rbd_is_lock_owner(rbd_dev))
3658 wake_requests(rbd_dev, false);
3659 up_read(&rbd_dev->lock_rwsem);
3660}
3661
3662static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3663 void **p)
3664{
3665 struct rbd_client_id cid = { 0 };
3666
3667 if (struct_v >= 2) {
3668 cid.gid = ceph_decode_64(p);
3669 cid.handle = ceph_decode_64(p);
3670 }
3671
3672 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3673 cid.handle);
3674 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3675 down_write(&rbd_dev->lock_rwsem);
3676 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3677 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3678 __func__, rbd_dev, cid.gid, cid.handle,
3679 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3680 up_write(&rbd_dev->lock_rwsem);
3681 return;
3682 }
3683
3684 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3685 downgrade_write(&rbd_dev->lock_rwsem);
3686 } else {
3687 down_read(&rbd_dev->lock_rwsem);
3688 }
3689
3690 if (!__rbd_is_lock_owner(rbd_dev))
3691 wake_requests(rbd_dev, false);
3692 up_read(&rbd_dev->lock_rwsem);
3693}
3694
3695static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3696 void **p)
3697{
3698 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3699 struct rbd_client_id cid = { 0 };
3700 bool need_to_send;
3701
3702 if (struct_v >= 2) {
3703 cid.gid = ceph_decode_64(p);
3704 cid.handle = ceph_decode_64(p);
3705 }
3706
3707 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3708 cid.handle);
3709 if (rbd_cid_equal(&cid, &my_cid))
3710 return false;
3711
3712 down_read(&rbd_dev->lock_rwsem);
3713 need_to_send = __rbd_is_lock_owner(rbd_dev);
3714 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3715 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3716 dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3717 rbd_dev);
3718 queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3719 }
3720 }
3721 up_read(&rbd_dev->lock_rwsem);
3722 return need_to_send;
3723}
3724
3725static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3726 u64 notify_id, u64 cookie, s32 *result)
3727{
3728 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3729 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3730 char buf[buf_size];
3731 int ret;
3732
3733 if (result) {
3734 void *p = buf;
3735
3736 /* encode ResponseMessage */
3737 ceph_start_encoding(&p, 1, 1,
3738 buf_size - CEPH_ENCODING_START_BLK_LEN);
3739 ceph_encode_32(&p, *result);
3740 } else {
3741 buf_size = 0;
3742 }
b8d70035 3743
922dab61
ID
3744 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3745 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3746 buf, buf_size);
52bb1f9b 3747 if (ret)
ed95b21a
ID
3748 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3749}
3750
3751static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3752 u64 cookie)
3753{
3754 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3755 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3756}
3757
3758static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3759 u64 notify_id, u64 cookie, s32 result)
3760{
3761 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3762 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3763}
3764
3765static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3766 u64 notifier_id, void *data, size_t data_len)
3767{
3768 struct rbd_device *rbd_dev = arg;
3769 void *p = data;
3770 void *const end = p + data_len;
3771 u8 struct_v;
3772 u32 len;
3773 u32 notify_op;
3774 int ret;
3775
3776 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3777 __func__, rbd_dev, cookie, notify_id, data_len);
3778 if (data_len) {
3779 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3780 &struct_v, &len);
3781 if (ret) {
3782 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3783 ret);
3784 return;
3785 }
3786
3787 notify_op = ceph_decode_32(&p);
3788 } else {
3789 /* legacy notification for header updates */
3790 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3791 len = 0;
3792 }
3793
3794 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3795 switch (notify_op) {
3796 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3797 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3798 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3799 break;
3800 case RBD_NOTIFY_OP_RELEASED_LOCK:
3801 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3802 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3803 break;
3804 case RBD_NOTIFY_OP_REQUEST_LOCK:
3805 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3806 /*
3807 * send ResponseMessage(0) back so the client
3808 * can detect a missing owner
3809 */
3810 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3811 cookie, 0);
3812 else
3813 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3814 break;
3815 case RBD_NOTIFY_OP_HEADER_UPDATE:
3816 ret = rbd_dev_refresh(rbd_dev);
3817 if (ret)
3818 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3819
3820 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3821 break;
3822 default:
3823 if (rbd_is_lock_owner(rbd_dev))
3824 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3825 cookie, -EOPNOTSUPP);
3826 else
3827 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3828 break;
3829 }
b8d70035
AE
3830}
3831
99d16943
ID
3832static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3833
922dab61 3834static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3835{
922dab61 3836 struct rbd_device *rbd_dev = arg;
bb040aa0 3837
922dab61 3838 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3839
ed95b21a
ID
3840 down_write(&rbd_dev->lock_rwsem);
3841 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3842 up_write(&rbd_dev->lock_rwsem);
3843
99d16943
ID
3844 mutex_lock(&rbd_dev->watch_mutex);
3845 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3846 __rbd_unregister_watch(rbd_dev);
3847 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3848
99d16943 3849 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3850 }
99d16943 3851 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3852}
3853
9969ebc5 3854/*
99d16943 3855 * watch_mutex must be locked
9969ebc5 3856 */
99d16943 3857static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3858{
3859 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3860 struct ceph_osd_linger_request *handle;
9969ebc5 3861
922dab61 3862 rbd_assert(!rbd_dev->watch_handle);
99d16943 3863 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3864
922dab61
ID
3865 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3866 &rbd_dev->header_oloc, rbd_watch_cb,
3867 rbd_watch_errcb, rbd_dev);
3868 if (IS_ERR(handle))
3869 return PTR_ERR(handle);
8eb87565 3870
922dab61 3871 rbd_dev->watch_handle = handle;
b30a01f2 3872 return 0;
b30a01f2
ID
3873}
3874
99d16943
ID
3875/*
3876 * watch_mutex must be locked
3877 */
3878static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3879{
922dab61
ID
3880 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3881 int ret;
b30a01f2 3882
99d16943
ID
3883 rbd_assert(rbd_dev->watch_handle);
3884 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3885
922dab61
ID
3886 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3887 if (ret)
3888 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3889
922dab61 3890 rbd_dev->watch_handle = NULL;
c525f036
ID
3891}
3892
99d16943
ID
3893static int rbd_register_watch(struct rbd_device *rbd_dev)
3894{
3895 int ret;
3896
3897 mutex_lock(&rbd_dev->watch_mutex);
3898 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3899 ret = __rbd_register_watch(rbd_dev);
3900 if (ret)
3901 goto out;
3902
3903 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3904 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3905
3906out:
3907 mutex_unlock(&rbd_dev->watch_mutex);
3908 return ret;
3909}
3910
3911static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3912{
99d16943
ID
3913 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3914
3915 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
ed95b21a
ID
3916 cancel_work_sync(&rbd_dev->acquired_lock_work);
3917 cancel_work_sync(&rbd_dev->released_lock_work);
3918 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3919 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3920}
3921
3922static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3923{
ed95b21a 3924 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3925 cancel_tasks_sync(rbd_dev);
3926
3927 mutex_lock(&rbd_dev->watch_mutex);
3928 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3929 __rbd_unregister_watch(rbd_dev);
3930 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3931 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3932
811c6688 3933 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3934}
3935
99d16943
ID
3936static void rbd_reregister_watch(struct work_struct *work)
3937{
3938 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3939 struct rbd_device, watch_dwork);
ed95b21a 3940 bool was_lock_owner = false;
99d16943
ID
3941 int ret;
3942
3943 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3944
ed95b21a
ID
3945 down_write(&rbd_dev->lock_rwsem);
3946 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3947 was_lock_owner = rbd_release_lock(rbd_dev);
3948
99d16943
ID
3949 mutex_lock(&rbd_dev->watch_mutex);
3950 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
3951 goto fail_unlock;
3952
3953 ret = __rbd_register_watch(rbd_dev);
3954 if (ret) {
3955 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3956 if (ret != -EBLACKLISTED)
3957 queue_delayed_work(rbd_dev->task_wq,
3958 &rbd_dev->watch_dwork,
3959 RBD_RETRY_DELAY);
3960 goto fail_unlock;
3961 }
3962
3963 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3964 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3965 mutex_unlock(&rbd_dev->watch_mutex);
3966
3967 ret = rbd_dev_refresh(rbd_dev);
3968 if (ret)
3969 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3970
ed95b21a
ID
3971 if (was_lock_owner) {
3972 ret = rbd_try_lock(rbd_dev);
3973 if (ret)
3974 rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3975 ret);
3976 }
3977
3978 up_write(&rbd_dev->lock_rwsem);
3979 wake_requests(rbd_dev, true);
99d16943
ID
3980 return;
3981
3982fail_unlock:
3983 mutex_unlock(&rbd_dev->watch_mutex);
ed95b21a 3984 up_write(&rbd_dev->lock_rwsem);
99d16943
ID
3985}
3986
36be9a76 3987/*
f40eb349
AE
3988 * Synchronous osd object method call. Returns the number of bytes
3989 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3990 */
3991static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3992 const char *object_name,
3993 const char *class_name,
3994 const char *method_name,
4157976b 3995 const void *outbound,
36be9a76 3996 size_t outbound_size,
4157976b 3997 void *inbound,
e2a58ee5 3998 size_t inbound_size)
36be9a76 3999{
2169238d 4000 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 4001 struct rbd_obj_request *obj_request;
36be9a76
AE
4002 struct page **pages;
4003 u32 page_count;
4004 int ret;
4005
4006 /*
6010a451
AE
4007 * Method calls are ultimately read operations. The result
4008 * should placed into the inbound buffer provided. They
4009 * also supply outbound data--parameters for the object
4010 * method. Currently if this is present it will be a
4011 * snapshot id.
36be9a76 4012 */
57385b51 4013 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
4014 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4015 if (IS_ERR(pages))
4016 return PTR_ERR(pages);
4017
4018 ret = -ENOMEM;
6010a451 4019 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
4020 OBJ_REQUEST_PAGES);
4021 if (!obj_request)
4022 goto out;
4023
4024 obj_request->pages = pages;
4025 obj_request->page_count = page_count;
4026
6d2940c8 4027 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4028 obj_request);
36be9a76
AE
4029 if (!obj_request->osd_req)
4030 goto out;
4031
c99d2d4a 4032 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
4033 class_name, method_name);
4034 if (outbound_size) {
4035 struct ceph_pagelist *pagelist;
4036
4037 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
4038 if (!pagelist)
4039 goto out;
4040
4041 ceph_pagelist_init(pagelist);
4042 ceph_pagelist_append(pagelist, outbound, outbound_size);
4043 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
4044 pagelist);
4045 }
a4ce40a9
AE
4046 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
4047 obj_request->pages, inbound_size,
44cd188d 4048 0, false, false);
9d4df01f 4049 rbd_osd_req_format_read(obj_request);
430c28c3 4050
36be9a76
AE
4051 ret = rbd_obj_request_submit(osdc, obj_request);
4052 if (ret)
4053 goto out;
4054 ret = rbd_obj_request_wait(obj_request);
4055 if (ret)
4056 goto out;
4057
4058 ret = obj_request->result;
4059 if (ret < 0)
4060 goto out;
57385b51
AE
4061
4062 rbd_assert(obj_request->xferred < (u64)INT_MAX);
4063 ret = (int)obj_request->xferred;
903bb32e 4064 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
4065out:
4066 if (obj_request)
4067 rbd_obj_request_put(obj_request);
4068 else
4069 ceph_release_page_vector(pages, page_count);
4070
4071 return ret;
4072}
4073
ed95b21a
ID
4074/*
4075 * lock_rwsem must be held for read
4076 */
4077static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
4078{
4079 DEFINE_WAIT(wait);
4080
4081 do {
4082 /*
4083 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4084 * and cancel_delayed_work() in wake_requests().
4085 */
4086 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4087 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4088 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4089 TASK_UNINTERRUPTIBLE);
4090 up_read(&rbd_dev->lock_rwsem);
4091 schedule();
4092 down_read(&rbd_dev->lock_rwsem);
4093 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
4094 finish_wait(&rbd_dev->lock_waitq, &wait);
4095}
4096
7ad18afa 4097static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4098{
7ad18afa
CH
4099 struct request *rq = blk_mq_rq_from_pdu(work);
4100 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4101 struct rbd_img_request *img_request;
4e752f0a 4102 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4103 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4104 u64 length = blk_rq_bytes(rq);
6d2940c8 4105 enum obj_operation_type op_type;
4e752f0a 4106 u64 mapping_size;
ed95b21a 4107 bool must_be_locked = false;
bf0d5f50
AE
4108 int result;
4109
7ad18afa
CH
4110 if (rq->cmd_type != REQ_TYPE_FS) {
4111 dout("%s: non-fs request type %d\n", __func__,
4112 (int) rq->cmd_type);
4113 result = -EIO;
4114 goto err;
4115 }
4116
c2df40df 4117 if (req_op(rq) == REQ_OP_DISCARD)
90e98c52 4118 op_type = OBJ_OP_DISCARD;
c2df40df 4119 else if (req_op(rq) == REQ_OP_WRITE)
6d2940c8
GZ
4120 op_type = OBJ_OP_WRITE;
4121 else
4122 op_type = OBJ_OP_READ;
4123
bc1ecc65 4124 /* Ignore/skip any zero-length requests */
bf0d5f50 4125
bc1ecc65
ID
4126 if (!length) {
4127 dout("%s: zero-length request\n", __func__);
4128 result = 0;
4129 goto err_rq;
4130 }
bf0d5f50 4131
6d2940c8 4132 /* Only reads are allowed to a read-only device */
bc1ecc65 4133
6d2940c8 4134 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4135 if (rbd_dev->mapping.read_only) {
4136 result = -EROFS;
4137 goto err_rq;
4dda41d3 4138 }
bc1ecc65
ID
4139 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4140 }
4dda41d3 4141
bc1ecc65
ID
4142 /*
4143 * Quit early if the mapped snapshot no longer exists. It's
4144 * still possible the snapshot will have disappeared by the
4145 * time our request arrives at the osd, but there's no sense in
4146 * sending it if we already know.
4147 */
4148 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4149 dout("request for non-existent snapshot");
4150 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4151 result = -ENXIO;
4152 goto err_rq;
4153 }
4dda41d3 4154
bc1ecc65
ID
4155 if (offset && length > U64_MAX - offset + 1) {
4156 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4157 length);
4158 result = -EINVAL;
4159 goto err_rq; /* Shouldn't happen */
4160 }
4dda41d3 4161
7ad18afa
CH
4162 blk_mq_start_request(rq);
4163
4e752f0a
JD
4164 down_read(&rbd_dev->header_rwsem);
4165 mapping_size = rbd_dev->mapping.size;
6d2940c8 4166 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4167 snapc = rbd_dev->header.snapc;
4168 ceph_get_snap_context(snapc);
ed95b21a 4169 must_be_locked = rbd_is_lock_supported(rbd_dev);
4e752f0a
JD
4170 }
4171 up_read(&rbd_dev->header_rwsem);
4172
4173 if (offset + length > mapping_size) {
bc1ecc65 4174 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4175 length, mapping_size);
bc1ecc65
ID
4176 result = -EIO;
4177 goto err_rq;
4178 }
bf0d5f50 4179
ed95b21a
ID
4180 if (must_be_locked) {
4181 down_read(&rbd_dev->lock_rwsem);
4182 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4183 rbd_wait_state_locked(rbd_dev);
4184 }
4185
6d2940c8 4186 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4187 snapc);
bc1ecc65
ID
4188 if (!img_request) {
4189 result = -ENOMEM;
ed95b21a 4190 goto err_unlock;
bc1ecc65
ID
4191 }
4192 img_request->rq = rq;
70b16db8 4193 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4194
90e98c52
GZ
4195 if (op_type == OBJ_OP_DISCARD)
4196 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4197 NULL);
4198 else
4199 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4200 rq->bio);
bc1ecc65
ID
4201 if (result)
4202 goto err_img_request;
bf0d5f50 4203
bc1ecc65
ID
4204 result = rbd_img_request_submit(img_request);
4205 if (result)
4206 goto err_img_request;
bf0d5f50 4207
ed95b21a
ID
4208 if (must_be_locked)
4209 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4210 return;
bf0d5f50 4211
bc1ecc65
ID
4212err_img_request:
4213 rbd_img_request_put(img_request);
ed95b21a
ID
4214err_unlock:
4215 if (must_be_locked)
4216 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4217err_rq:
4218 if (result)
4219 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4220 obj_op_name(op_type), length, offset, result);
e96a650a 4221 ceph_put_snap_context(snapc);
7ad18afa
CH
4222err:
4223 blk_mq_end_request(rq, result);
bc1ecc65 4224}
bf0d5f50 4225
7ad18afa
CH
4226static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4227 const struct blk_mq_queue_data *bd)
bc1ecc65 4228{
7ad18afa
CH
4229 struct request *rq = bd->rq;
4230 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4231
7ad18afa
CH
4232 queue_work(rbd_wq, work);
4233 return BLK_MQ_RQ_QUEUE_OK;
bf0d5f50
AE
4234}
4235
602adf40
YS
4236static void rbd_free_disk(struct rbd_device *rbd_dev)
4237{
4238 struct gendisk *disk = rbd_dev->disk;
4239
4240 if (!disk)
4241 return;
4242
a0cab924
AE
4243 rbd_dev->disk = NULL;
4244 if (disk->flags & GENHD_FL_UP) {
602adf40 4245 del_gendisk(disk);
a0cab924
AE
4246 if (disk->queue)
4247 blk_cleanup_queue(disk->queue);
7ad18afa 4248 blk_mq_free_tag_set(&rbd_dev->tag_set);
a0cab924 4249 }
602adf40
YS
4250 put_disk(disk);
4251}
4252
788e2df3
AE
4253static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4254 const char *object_name,
7097f8df 4255 u64 offset, u64 length, void *buf)
788e2df3
AE
4256
4257{
2169238d 4258 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 4259 struct rbd_obj_request *obj_request;
788e2df3
AE
4260 struct page **pages = NULL;
4261 u32 page_count;
1ceae7ef 4262 size_t size;
788e2df3
AE
4263 int ret;
4264
4265 page_count = (u32) calc_pages_for(offset, length);
4266 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
4267 if (IS_ERR(pages))
a8d42056 4268 return PTR_ERR(pages);
788e2df3
AE
4269
4270 ret = -ENOMEM;
4271 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 4272 OBJ_REQUEST_PAGES);
788e2df3
AE
4273 if (!obj_request)
4274 goto out;
4275
4276 obj_request->pages = pages;
4277 obj_request->page_count = page_count;
4278
6d2940c8 4279 obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
deb236b3 4280 obj_request);
788e2df3
AE
4281 if (!obj_request->osd_req)
4282 goto out;
4283
c99d2d4a
AE
4284 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
4285 offset, length, 0, 0);
406e2c9f 4286 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 4287 obj_request->pages,
44cd188d
AE
4288 obj_request->length,
4289 obj_request->offset & ~PAGE_MASK,
4290 false, false);
9d4df01f 4291 rbd_osd_req_format_read(obj_request);
430c28c3 4292
788e2df3
AE
4293 ret = rbd_obj_request_submit(osdc, obj_request);
4294 if (ret)
4295 goto out;
4296 ret = rbd_obj_request_wait(obj_request);
4297 if (ret)
4298 goto out;
4299
4300 ret = obj_request->result;
4301 if (ret < 0)
4302 goto out;
1ceae7ef
AE
4303
4304 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
4305 size = (size_t) obj_request->xferred;
903bb32e 4306 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
4307 rbd_assert(size <= (size_t)INT_MAX);
4308 ret = (int)size;
788e2df3
AE
4309out:
4310 if (obj_request)
4311 rbd_obj_request_put(obj_request);
4312 else
4313 ceph_release_page_vector(pages, page_count);
4314
4315 return ret;
4316}
4317
602adf40 4318/*
662518b1
AE
4319 * Read the complete header for the given rbd device. On successful
4320 * return, the rbd_dev->header field will contain up-to-date
4321 * information about the image.
602adf40 4322 */
99a41ebc 4323static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4324{
4156d998 4325 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4326 u32 snap_count = 0;
4156d998
AE
4327 u64 names_size = 0;
4328 u32 want_count;
4329 int ret;
602adf40 4330
00f1f36f 4331 /*
4156d998
AE
4332 * The complete header will include an array of its 64-bit
4333 * snapshot ids, followed by the names of those snapshots as
4334 * a contiguous block of NUL-terminated strings. Note that
4335 * the number of snapshots could change by the time we read
4336 * it in, in which case we re-read it.
00f1f36f 4337 */
4156d998
AE
4338 do {
4339 size_t size;
4340
4341 kfree(ondisk);
4342
4343 size = sizeof (*ondisk);
4344 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4345 size += names_size;
4346 ondisk = kmalloc(size, GFP_KERNEL);
4347 if (!ondisk)
662518b1 4348 return -ENOMEM;
4156d998 4349
c41d13a3 4350 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name,
7097f8df 4351 0, size, ondisk);
4156d998 4352 if (ret < 0)
662518b1 4353 goto out;
c0cd10db 4354 if ((size_t)ret < size) {
4156d998 4355 ret = -ENXIO;
06ecc6cb
AE
4356 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4357 size, ret);
662518b1 4358 goto out;
4156d998
AE
4359 }
4360 if (!rbd_dev_ondisk_valid(ondisk)) {
4361 ret = -ENXIO;
06ecc6cb 4362 rbd_warn(rbd_dev, "invalid header");
662518b1 4363 goto out;
81e759fb 4364 }
602adf40 4365
4156d998
AE
4366 names_size = le64_to_cpu(ondisk->snap_names_len);
4367 want_count = snap_count;
4368 snap_count = le32_to_cpu(ondisk->snap_count);
4369 } while (snap_count != want_count);
00f1f36f 4370
662518b1
AE
4371 ret = rbd_header_from_disk(rbd_dev, ondisk);
4372out:
4156d998
AE
4373 kfree(ondisk);
4374
4375 return ret;
602adf40
YS
4376}
4377
15228ede
AE
4378/*
4379 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4380 * has disappeared from the (just updated) snapshot context.
4381 */
4382static void rbd_exists_validate(struct rbd_device *rbd_dev)
4383{
4384 u64 snap_id;
4385
4386 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4387 return;
4388
4389 snap_id = rbd_dev->spec->snap_id;
4390 if (snap_id == CEPH_NOSNAP)
4391 return;
4392
4393 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4394 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4395}
4396
9875201e
JD
4397static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4398{
4399 sector_t size;
9875201e
JD
4400
4401 /*
811c6688
ID
4402 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4403 * try to update its size. If REMOVING is set, updating size
4404 * is just useless work since the device can't be opened.
9875201e 4405 */
811c6688
ID
4406 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4407 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4408 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4409 dout("setting size to %llu sectors", (unsigned long long)size);
4410 set_capacity(rbd_dev->disk, size);
4411 revalidate_disk(rbd_dev->disk);
4412 }
4413}
4414
cc4a38bd 4415static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4416{
e627db08 4417 u64 mapping_size;
1fe5e993
AE
4418 int ret;
4419
cfbf6377 4420 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4421 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4422
4423 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4424 if (ret)
73e39e4d 4425 goto out;
15228ede 4426
e8f59b59
ID
4427 /*
4428 * If there is a parent, see if it has disappeared due to the
4429 * mapped image getting flattened.
4430 */
4431 if (rbd_dev->parent) {
4432 ret = rbd_dev_v2_parent_info(rbd_dev);
4433 if (ret)
73e39e4d 4434 goto out;
e8f59b59
ID
4435 }
4436
5ff1108c 4437 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4438 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4439 } else {
4440 /* validate mapped snapshot's EXISTS flag */
4441 rbd_exists_validate(rbd_dev);
4442 }
15228ede 4443
73e39e4d 4444out:
cfbf6377 4445 up_write(&rbd_dev->header_rwsem);
73e39e4d 4446 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4447 rbd_dev_update_size(rbd_dev);
1fe5e993 4448
73e39e4d 4449 return ret;
1fe5e993
AE
4450}
4451
7ad18afa
CH
4452static int rbd_init_request(void *data, struct request *rq,
4453 unsigned int hctx_idx, unsigned int request_idx,
4454 unsigned int numa_node)
4455{
4456 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4457
4458 INIT_WORK(work, rbd_queue_workfn);
4459 return 0;
4460}
4461
4462static struct blk_mq_ops rbd_mq_ops = {
4463 .queue_rq = rbd_queue_rq,
4464 .map_queue = blk_mq_map_queue,
4465 .init_request = rbd_init_request,
4466};
4467
602adf40
YS
4468static int rbd_init_disk(struct rbd_device *rbd_dev)
4469{
4470 struct gendisk *disk;
4471 struct request_queue *q;
593a9e7b 4472 u64 segment_size;
7ad18afa 4473 int err;
602adf40 4474
602adf40 4475 /* create gendisk info */
7e513d43
ID
4476 disk = alloc_disk(single_major ?
4477 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4478 RBD_MINORS_PER_MAJOR);
602adf40 4479 if (!disk)
1fcdb8aa 4480 return -ENOMEM;
602adf40 4481
f0f8cef5 4482 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4483 rbd_dev->dev_id);
602adf40 4484 disk->major = rbd_dev->major;
dd82fff1 4485 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4486 if (single_major)
4487 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4488 disk->fops = &rbd_bd_ops;
4489 disk->private_data = rbd_dev;
4490
7ad18afa
CH
4491 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4492 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4493 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4494 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4495 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4496 rbd_dev->tag_set.nr_hw_queues = 1;
4497 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4498
4499 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4500 if (err)
602adf40 4501 goto out_disk;
029bcbd8 4502
7ad18afa
CH
4503 q = blk_mq_init_queue(&rbd_dev->tag_set);
4504 if (IS_ERR(q)) {
4505 err = PTR_ERR(q);
4506 goto out_tag_set;
4507 }
4508
d8a2c89c
ID
4509 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4510 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4511
029bcbd8 4512 /* set io sizes to object size */
593a9e7b
AE
4513 segment_size = rbd_obj_bytes(&rbd_dev->header);
4514 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4515 q->limits.max_sectors = queue_max_hw_sectors(q);
d3834fef 4516 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
593a9e7b
AE
4517 blk_queue_max_segment_size(q, segment_size);
4518 blk_queue_io_min(q, segment_size);
4519 blk_queue_io_opt(q, segment_size);
029bcbd8 4520
90e98c52
GZ
4521 /* enable the discard support */
4522 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4523 q->limits.discard_granularity = segment_size;
4524 q->limits.discard_alignment = segment_size;
2bb4cd5c 4525 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
b76f8239 4526 q->limits.discard_zeroes_data = 1;
90e98c52 4527
bae818ee
RH
4528 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4529 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
4530
602adf40
YS
4531 disk->queue = q;
4532
4533 q->queuedata = rbd_dev;
4534
4535 rbd_dev->disk = disk;
602adf40 4536
602adf40 4537 return 0;
7ad18afa
CH
4538out_tag_set:
4539 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4540out_disk:
4541 put_disk(disk);
7ad18afa 4542 return err;
602adf40
YS
4543}
4544
dfc5606d
YS
4545/*
4546 sysfs
4547*/
4548
593a9e7b
AE
4549static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4550{
4551 return container_of(dev, struct rbd_device, dev);
4552}
4553
dfc5606d
YS
4554static ssize_t rbd_size_show(struct device *dev,
4555 struct device_attribute *attr, char *buf)
4556{
593a9e7b 4557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4558
fc71d833
AE
4559 return sprintf(buf, "%llu\n",
4560 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4561}
4562
34b13184
AE
4563/*
4564 * Note this shows the features for whatever's mapped, which is not
4565 * necessarily the base image.
4566 */
4567static ssize_t rbd_features_show(struct device *dev,
4568 struct device_attribute *attr, char *buf)
4569{
4570 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4571
4572 return sprintf(buf, "0x%016llx\n",
fc71d833 4573 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4574}
4575
dfc5606d
YS
4576static ssize_t rbd_major_show(struct device *dev,
4577 struct device_attribute *attr, char *buf)
4578{
593a9e7b 4579 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4580
fc71d833
AE
4581 if (rbd_dev->major)
4582 return sprintf(buf, "%d\n", rbd_dev->major);
4583
4584 return sprintf(buf, "(none)\n");
dd82fff1
ID
4585}
4586
4587static ssize_t rbd_minor_show(struct device *dev,
4588 struct device_attribute *attr, char *buf)
4589{
4590 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4591
dd82fff1 4592 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4593}
4594
005a07bf
ID
4595static ssize_t rbd_client_addr_show(struct device *dev,
4596 struct device_attribute *attr, char *buf)
4597{
4598 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4599 struct ceph_entity_addr *client_addr =
4600 ceph_client_addr(rbd_dev->rbd_client->client);
4601
4602 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4603 le32_to_cpu(client_addr->nonce));
4604}
4605
dfc5606d
YS
4606static ssize_t rbd_client_id_show(struct device *dev,
4607 struct device_attribute *attr, char *buf)
602adf40 4608{
593a9e7b 4609 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4610
1dbb4399 4611 return sprintf(buf, "client%lld\n",
033268a5 4612 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4613}
4614
267fb90b
MC
4615static ssize_t rbd_cluster_fsid_show(struct device *dev,
4616 struct device_attribute *attr, char *buf)
4617{
4618 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4619
4620 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4621}
4622
dfc5606d
YS
4623static ssize_t rbd_pool_show(struct device *dev,
4624 struct device_attribute *attr, char *buf)
602adf40 4625{
593a9e7b 4626 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4627
0d7dbfce 4628 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4629}
4630
9bb2f334
AE
4631static ssize_t rbd_pool_id_show(struct device *dev,
4632 struct device_attribute *attr, char *buf)
4633{
4634 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4635
0d7dbfce 4636 return sprintf(buf, "%llu\n",
fc71d833 4637 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4638}
4639
dfc5606d
YS
4640static ssize_t rbd_name_show(struct device *dev,
4641 struct device_attribute *attr, char *buf)
4642{
593a9e7b 4643 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4644
a92ffdf8
AE
4645 if (rbd_dev->spec->image_name)
4646 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4647
4648 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4649}
4650
589d30e0
AE
4651static ssize_t rbd_image_id_show(struct device *dev,
4652 struct device_attribute *attr, char *buf)
4653{
4654 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4655
0d7dbfce 4656 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4657}
4658
34b13184
AE
4659/*
4660 * Shows the name of the currently-mapped snapshot (or
4661 * RBD_SNAP_HEAD_NAME for the base image).
4662 */
dfc5606d
YS
4663static ssize_t rbd_snap_show(struct device *dev,
4664 struct device_attribute *attr,
4665 char *buf)
4666{
593a9e7b 4667 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4668
0d7dbfce 4669 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4670}
4671
92a58671
MC
4672static ssize_t rbd_snap_id_show(struct device *dev,
4673 struct device_attribute *attr, char *buf)
4674{
4675 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4676
4677 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4678}
4679
86b00e0d 4680/*
ff96128f
ID
4681 * For a v2 image, shows the chain of parent images, separated by empty
4682 * lines. For v1 images or if there is no parent, shows "(no parent
4683 * image)".
86b00e0d
AE
4684 */
4685static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4686 struct device_attribute *attr,
4687 char *buf)
86b00e0d
AE
4688{
4689 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4690 ssize_t count = 0;
86b00e0d 4691
ff96128f 4692 if (!rbd_dev->parent)
86b00e0d
AE
4693 return sprintf(buf, "(no parent image)\n");
4694
ff96128f
ID
4695 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4696 struct rbd_spec *spec = rbd_dev->parent_spec;
4697
4698 count += sprintf(&buf[count], "%s"
4699 "pool_id %llu\npool_name %s\n"
4700 "image_id %s\nimage_name %s\n"
4701 "snap_id %llu\nsnap_name %s\n"
4702 "overlap %llu\n",
4703 !count ? "" : "\n", /* first? */
4704 spec->pool_id, spec->pool_name,
4705 spec->image_id, spec->image_name ?: "(unknown)",
4706 spec->snap_id, spec->snap_name,
4707 rbd_dev->parent_overlap);
4708 }
4709
4710 return count;
86b00e0d
AE
4711}
4712
dfc5606d
YS
4713static ssize_t rbd_image_refresh(struct device *dev,
4714 struct device_attribute *attr,
4715 const char *buf,
4716 size_t size)
4717{
593a9e7b 4718 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4719 int ret;
602adf40 4720
cc4a38bd 4721 ret = rbd_dev_refresh(rbd_dev);
e627db08 4722 if (ret)
52bb1f9b 4723 return ret;
b813623a 4724
52bb1f9b 4725 return size;
dfc5606d 4726}
602adf40 4727
dfc5606d 4728static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4729static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4730static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4731static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4732static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4733static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4734static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
dfc5606d 4735static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4736static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4737static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4738static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4739static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4740static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4741static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4742static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4743
4744static struct attribute *rbd_attrs[] = {
4745 &dev_attr_size.attr,
34b13184 4746 &dev_attr_features.attr,
dfc5606d 4747 &dev_attr_major.attr,
dd82fff1 4748 &dev_attr_minor.attr,
005a07bf 4749 &dev_attr_client_addr.attr,
dfc5606d 4750 &dev_attr_client_id.attr,
267fb90b 4751 &dev_attr_cluster_fsid.attr,
dfc5606d 4752 &dev_attr_pool.attr,
9bb2f334 4753 &dev_attr_pool_id.attr,
dfc5606d 4754 &dev_attr_name.attr,
589d30e0 4755 &dev_attr_image_id.attr,
dfc5606d 4756 &dev_attr_current_snap.attr,
92a58671 4757 &dev_attr_snap_id.attr,
86b00e0d 4758 &dev_attr_parent.attr,
dfc5606d 4759 &dev_attr_refresh.attr,
dfc5606d
YS
4760 NULL
4761};
4762
4763static struct attribute_group rbd_attr_group = {
4764 .attrs = rbd_attrs,
4765};
4766
4767static const struct attribute_group *rbd_attr_groups[] = {
4768 &rbd_attr_group,
4769 NULL
4770};
4771
6cac4695 4772static void rbd_dev_release(struct device *dev);
dfc5606d
YS
4773
4774static struct device_type rbd_device_type = {
4775 .name = "rbd",
4776 .groups = rbd_attr_groups,
6cac4695 4777 .release = rbd_dev_release,
dfc5606d
YS
4778};
4779
8b8fb99c
AE
4780static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4781{
4782 kref_get(&spec->kref);
4783
4784 return spec;
4785}
4786
4787static void rbd_spec_free(struct kref *kref);
4788static void rbd_spec_put(struct rbd_spec *spec)
4789{
4790 if (spec)
4791 kref_put(&spec->kref, rbd_spec_free);
4792}
4793
4794static struct rbd_spec *rbd_spec_alloc(void)
4795{
4796 struct rbd_spec *spec;
4797
4798 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4799 if (!spec)
4800 return NULL;
04077599
ID
4801
4802 spec->pool_id = CEPH_NOPOOL;
4803 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4804 kref_init(&spec->kref);
4805
8b8fb99c
AE
4806 return spec;
4807}
4808
4809static void rbd_spec_free(struct kref *kref)
4810{
4811 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4812
4813 kfree(spec->pool_name);
4814 kfree(spec->image_id);
4815 kfree(spec->image_name);
4816 kfree(spec->snap_name);
4817 kfree(spec);
4818}
4819
1643dfa4 4820static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4821{
99d16943 4822 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4823 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
99d16943 4824
c41d13a3 4825 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4826 ceph_oloc_destroy(&rbd_dev->header_oloc);
c41d13a3 4827
dd5ac32d
ID
4828 rbd_put_client(rbd_dev->rbd_client);
4829 rbd_spec_put(rbd_dev->spec);
4830 kfree(rbd_dev->opts);
4831 kfree(rbd_dev);
1643dfa4
ID
4832}
4833
4834static void rbd_dev_release(struct device *dev)
4835{
4836 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4837 bool need_put = !!rbd_dev->opts;
4838
4839 if (need_put) {
4840 destroy_workqueue(rbd_dev->task_wq);
4841 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4842 }
4843
4844 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4845
4846 /*
4847 * This is racy, but way better than putting module outside of
4848 * the release callback. The race window is pretty small, so
4849 * doing something similar to dm (dm-builtin.c) is overkill.
4850 */
4851 if (need_put)
4852 module_put(THIS_MODULE);
4853}
4854
1643dfa4
ID
4855static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4856 struct rbd_spec *spec)
c53d5893
AE
4857{
4858 struct rbd_device *rbd_dev;
4859
1643dfa4 4860 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4861 if (!rbd_dev)
4862 return NULL;
4863
4864 spin_lock_init(&rbd_dev->lock);
4865 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4866 init_rwsem(&rbd_dev->header_rwsem);
4867
c41d13a3 4868 ceph_oid_init(&rbd_dev->header_oid);
922dab61 4869 ceph_oloc_init(&rbd_dev->header_oloc);
c41d13a3 4870
99d16943
ID
4871 mutex_init(&rbd_dev->watch_mutex);
4872 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4873 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4874
ed95b21a
ID
4875 init_rwsem(&rbd_dev->lock_rwsem);
4876 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4877 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4878 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4879 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4880 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4881 init_waitqueue_head(&rbd_dev->lock_waitq);
4882
dd5ac32d
ID
4883 rbd_dev->dev.bus = &rbd_bus_type;
4884 rbd_dev->dev.type = &rbd_device_type;
4885 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4886 device_initialize(&rbd_dev->dev);
4887
c53d5893 4888 rbd_dev->rbd_client = rbdc;
d147543d 4889 rbd_dev->spec = spec;
0903e875 4890
7627151e
YZ
4891 rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
4892 rbd_dev->layout.stripe_count = 1;
4893 rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
4894 rbd_dev->layout.pool_id = spec->pool_id;
30c156d9 4895 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
0903e875 4896
1643dfa4
ID
4897 return rbd_dev;
4898}
4899
4900/*
4901 * Create a mapping rbd_dev.
4902 */
4903static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4904 struct rbd_spec *spec,
4905 struct rbd_options *opts)
4906{
4907 struct rbd_device *rbd_dev;
4908
4909 rbd_dev = __rbd_dev_create(rbdc, spec);
4910 if (!rbd_dev)
4911 return NULL;
4912
4913 rbd_dev->opts = opts;
4914
4915 /* get an id and fill in device name */
4916 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4917 minor_to_rbd_dev_id(1 << MINORBITS),
4918 GFP_KERNEL);
4919 if (rbd_dev->dev_id < 0)
4920 goto fail_rbd_dev;
4921
4922 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4923 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4924 rbd_dev->name);
4925 if (!rbd_dev->task_wq)
4926 goto fail_dev_id;
dd5ac32d 4927
1643dfa4
ID
4928 /* we have a ref from do_rbd_add() */
4929 __module_get(THIS_MODULE);
4930
4931 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4932 return rbd_dev;
1643dfa4
ID
4933
4934fail_dev_id:
4935 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4936fail_rbd_dev:
4937 rbd_dev_free(rbd_dev);
4938 return NULL;
c53d5893
AE
4939}
4940
4941static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4942{
dd5ac32d
ID
4943 if (rbd_dev)
4944 put_device(&rbd_dev->dev);
c53d5893
AE
4945}
4946
9d475de5
AE
4947/*
4948 * Get the size and object order for an image snapshot, or if
4949 * snap_id is CEPH_NOSNAP, gets this information for the base
4950 * image.
4951 */
4952static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4953 u8 *order, u64 *snap_size)
4954{
4955 __le64 snapid = cpu_to_le64(snap_id);
4956 int ret;
4957 struct {
4958 u8 order;
4959 __le64 size;
4960 } __attribute__ ((packed)) size_buf = { 0 };
4961
c41d13a3 4962 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
9d475de5 4963 "rbd", "get_size",
4157976b 4964 &snapid, sizeof (snapid),
e2a58ee5 4965 &size_buf, sizeof (size_buf));
36be9a76 4966 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4967 if (ret < 0)
4968 return ret;
57385b51
AE
4969 if (ret < sizeof (size_buf))
4970 return -ERANGE;
9d475de5 4971
c3545579 4972 if (order) {
c86f86e9 4973 *order = size_buf.order;
c3545579
JD
4974 dout(" order %u", (unsigned int)*order);
4975 }
9d475de5
AE
4976 *snap_size = le64_to_cpu(size_buf.size);
4977
c3545579
JD
4978 dout(" snap_id 0x%016llx snap_size = %llu\n",
4979 (unsigned long long)snap_id,
57385b51 4980 (unsigned long long)*snap_size);
9d475de5
AE
4981
4982 return 0;
4983}
4984
4985static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4986{
4987 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4988 &rbd_dev->header.obj_order,
4989 &rbd_dev->header.image_size);
4990}
4991
1e130199
AE
4992static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4993{
4994 void *reply_buf;
4995 int ret;
4996 void *p;
4997
4998 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4999 if (!reply_buf)
5000 return -ENOMEM;
5001
c41d13a3 5002 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 5003 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 5004 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 5005 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
5006 if (ret < 0)
5007 goto out;
5008
5009 p = reply_buf;
5010 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
5011 p + ret, NULL, GFP_NOIO);
5012 ret = 0;
1e130199
AE
5013
5014 if (IS_ERR(rbd_dev->header.object_prefix)) {
5015 ret = PTR_ERR(rbd_dev->header.object_prefix);
5016 rbd_dev->header.object_prefix = NULL;
5017 } else {
5018 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5019 }
1e130199
AE
5020out:
5021 kfree(reply_buf);
5022
5023 return ret;
5024}
5025
b1b5402a
AE
5026static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5027 u64 *snap_features)
5028{
5029 __le64 snapid = cpu_to_le64(snap_id);
5030 struct {
5031 __le64 features;
5032 __le64 incompat;
4157976b 5033 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 5034 u64 unsup;
b1b5402a
AE
5035 int ret;
5036
c41d13a3 5037 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b1b5402a 5038 "rbd", "get_features",
4157976b 5039 &snapid, sizeof (snapid),
e2a58ee5 5040 &features_buf, sizeof (features_buf));
36be9a76 5041 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
5042 if (ret < 0)
5043 return ret;
57385b51
AE
5044 if (ret < sizeof (features_buf))
5045 return -ERANGE;
d889140c 5046
d3767f0f
ID
5047 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5048 if (unsup) {
5049 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5050 unsup);
b8f5c6ed 5051 return -ENXIO;
d3767f0f 5052 }
d889140c 5053
b1b5402a
AE
5054 *snap_features = le64_to_cpu(features_buf.features);
5055
5056 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
5057 (unsigned long long)snap_id,
5058 (unsigned long long)*snap_features,
5059 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
5060
5061 return 0;
5062}
5063
5064static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5065{
5066 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5067 &rbd_dev->header.features);
5068}
5069
86b00e0d
AE
5070static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5071{
5072 struct rbd_spec *parent_spec;
5073 size_t size;
5074 void *reply_buf = NULL;
5075 __le64 snapid;
5076 void *p;
5077 void *end;
642a2537 5078 u64 pool_id;
86b00e0d 5079 char *image_id;
3b5cf2a2 5080 u64 snap_id;
86b00e0d 5081 u64 overlap;
86b00e0d
AE
5082 int ret;
5083
5084 parent_spec = rbd_spec_alloc();
5085 if (!parent_spec)
5086 return -ENOMEM;
5087
5088 size = sizeof (__le64) + /* pool_id */
5089 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5090 sizeof (__le64) + /* snap_id */
5091 sizeof (__le64); /* overlap */
5092 reply_buf = kmalloc(size, GFP_KERNEL);
5093 if (!reply_buf) {
5094 ret = -ENOMEM;
5095 goto out_err;
5096 }
5097
4d9b67cd 5098 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
c41d13a3 5099 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
86b00e0d 5100 "rbd", "get_parent",
4157976b 5101 &snapid, sizeof (snapid),
e2a58ee5 5102 reply_buf, size);
36be9a76 5103 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5104 if (ret < 0)
5105 goto out_err;
5106
86b00e0d 5107 p = reply_buf;
57385b51
AE
5108 end = reply_buf + ret;
5109 ret = -ERANGE;
642a2537 5110 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5111 if (pool_id == CEPH_NOPOOL) {
5112 /*
5113 * Either the parent never existed, or we have
5114 * record of it but the image got flattened so it no
5115 * longer has a parent. When the parent of a
5116 * layered image disappears we immediately set the
5117 * overlap to 0. The effect of this is that all new
5118 * requests will be treated as if the image had no
5119 * parent.
5120 */
5121 if (rbd_dev->parent_overlap) {
5122 rbd_dev->parent_overlap = 0;
392a9dad
AE
5123 rbd_dev_parent_put(rbd_dev);
5124 pr_info("%s: clone image has been flattened\n",
5125 rbd_dev->disk->disk_name);
5126 }
5127
86b00e0d 5128 goto out; /* No parent? No problem. */
392a9dad 5129 }
86b00e0d 5130
0903e875
AE
5131 /* The ceph file layout needs to fit pool id in 32 bits */
5132
5133 ret = -EIO;
642a2537 5134 if (pool_id > (u64)U32_MAX) {
9584d508 5135 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5136 (unsigned long long)pool_id, U32_MAX);
57385b51 5137 goto out_err;
c0cd10db 5138 }
0903e875 5139
979ed480 5140 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5141 if (IS_ERR(image_id)) {
5142 ret = PTR_ERR(image_id);
5143 goto out_err;
5144 }
3b5cf2a2 5145 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5146 ceph_decode_64_safe(&p, end, overlap, out_err);
5147
3b5cf2a2
AE
5148 /*
5149 * The parent won't change (except when the clone is
5150 * flattened, already handled that). So we only need to
5151 * record the parent spec we have not already done so.
5152 */
5153 if (!rbd_dev->parent_spec) {
5154 parent_spec->pool_id = pool_id;
5155 parent_spec->image_id = image_id;
5156 parent_spec->snap_id = snap_id;
70cf49cf
AE
5157 rbd_dev->parent_spec = parent_spec;
5158 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5159 } else {
5160 kfree(image_id);
3b5cf2a2
AE
5161 }
5162
5163 /*
cf32bd9c
ID
5164 * We always update the parent overlap. If it's zero we issue
5165 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5166 */
3b5cf2a2 5167 if (!overlap) {
3b5cf2a2 5168 if (parent_spec) {
cf32bd9c
ID
5169 /* refresh, careful to warn just once */
5170 if (rbd_dev->parent_overlap)
5171 rbd_warn(rbd_dev,
5172 "clone now standalone (overlap became 0)");
3b5cf2a2 5173 } else {
cf32bd9c
ID
5174 /* initial probe */
5175 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5176 }
70cf49cf 5177 }
cf32bd9c
ID
5178 rbd_dev->parent_overlap = overlap;
5179
86b00e0d
AE
5180out:
5181 ret = 0;
5182out_err:
5183 kfree(reply_buf);
5184 rbd_spec_put(parent_spec);
5185
5186 return ret;
5187}
5188
cc070d59
AE
5189static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5190{
5191 struct {
5192 __le64 stripe_unit;
5193 __le64 stripe_count;
5194 } __attribute__ ((packed)) striping_info_buf = { 0 };
5195 size_t size = sizeof (striping_info_buf);
5196 void *p;
5197 u64 obj_size;
5198 u64 stripe_unit;
5199 u64 stripe_count;
5200 int ret;
5201
c41d13a3 5202 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
cc070d59 5203 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 5204 (char *)&striping_info_buf, size);
cc070d59
AE
5205 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5206 if (ret < 0)
5207 return ret;
5208 if (ret < size)
5209 return -ERANGE;
5210
5211 /*
5212 * We don't actually support the "fancy striping" feature
5213 * (STRIPINGV2) yet, but if the striping sizes are the
5214 * defaults the behavior is the same as before. So find
5215 * out, and only fail if the image has non-default values.
5216 */
5217 ret = -EINVAL;
5218 obj_size = (u64)1 << rbd_dev->header.obj_order;
5219 p = &striping_info_buf;
5220 stripe_unit = ceph_decode_64(&p);
5221 if (stripe_unit != obj_size) {
5222 rbd_warn(rbd_dev, "unsupported stripe unit "
5223 "(got %llu want %llu)",
5224 stripe_unit, obj_size);
5225 return -EINVAL;
5226 }
5227 stripe_count = ceph_decode_64(&p);
5228 if (stripe_count != 1) {
5229 rbd_warn(rbd_dev, "unsupported stripe count "
5230 "(got %llu want 1)", stripe_count);
5231 return -EINVAL;
5232 }
500d0c0f
AE
5233 rbd_dev->header.stripe_unit = stripe_unit;
5234 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5235
5236 return 0;
5237}
5238
9e15b77d
AE
5239static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5240{
5241 size_t image_id_size;
5242 char *image_id;
5243 void *p;
5244 void *end;
5245 size_t size;
5246 void *reply_buf = NULL;
5247 size_t len = 0;
5248 char *image_name = NULL;
5249 int ret;
5250
5251 rbd_assert(!rbd_dev->spec->image_name);
5252
69e7a02f
AE
5253 len = strlen(rbd_dev->spec->image_id);
5254 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5255 image_id = kmalloc(image_id_size, GFP_KERNEL);
5256 if (!image_id)
5257 return NULL;
5258
5259 p = image_id;
4157976b 5260 end = image_id + image_id_size;
57385b51 5261 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5262
5263 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5264 reply_buf = kmalloc(size, GFP_KERNEL);
5265 if (!reply_buf)
5266 goto out;
5267
36be9a76 5268 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
5269 "rbd", "dir_get_name",
5270 image_id, image_id_size,
e2a58ee5 5271 reply_buf, size);
9e15b77d
AE
5272 if (ret < 0)
5273 goto out;
5274 p = reply_buf;
f40eb349
AE
5275 end = reply_buf + ret;
5276
9e15b77d
AE
5277 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5278 if (IS_ERR(image_name))
5279 image_name = NULL;
5280 else
5281 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5282out:
5283 kfree(reply_buf);
5284 kfree(image_id);
5285
5286 return image_name;
5287}
5288
2ad3d716
AE
5289static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5290{
5291 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5292 const char *snap_name;
5293 u32 which = 0;
5294
5295 /* Skip over names until we find the one we are looking for */
5296
5297 snap_name = rbd_dev->header.snap_names;
5298 while (which < snapc->num_snaps) {
5299 if (!strcmp(name, snap_name))
5300 return snapc->snaps[which];
5301 snap_name += strlen(snap_name) + 1;
5302 which++;
5303 }
5304 return CEPH_NOSNAP;
5305}
5306
5307static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5308{
5309 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5310 u32 which;
5311 bool found = false;
5312 u64 snap_id;
5313
5314 for (which = 0; !found && which < snapc->num_snaps; which++) {
5315 const char *snap_name;
5316
5317 snap_id = snapc->snaps[which];
5318 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5319 if (IS_ERR(snap_name)) {
5320 /* ignore no-longer existing snapshots */
5321 if (PTR_ERR(snap_name) == -ENOENT)
5322 continue;
5323 else
5324 break;
5325 }
2ad3d716
AE
5326 found = !strcmp(name, snap_name);
5327 kfree(snap_name);
5328 }
5329 return found ? snap_id : CEPH_NOSNAP;
5330}
5331
5332/*
5333 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5334 * no snapshot by that name is found, or if an error occurs.
5335 */
5336static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5337{
5338 if (rbd_dev->image_format == 1)
5339 return rbd_v1_snap_id_by_name(rbd_dev, name);
5340
5341 return rbd_v2_snap_id_by_name(rbd_dev, name);
5342}
5343
9e15b77d 5344/*
04077599
ID
5345 * An image being mapped will have everything but the snap id.
5346 */
5347static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5348{
5349 struct rbd_spec *spec = rbd_dev->spec;
5350
5351 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5352 rbd_assert(spec->image_id && spec->image_name);
5353 rbd_assert(spec->snap_name);
5354
5355 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5356 u64 snap_id;
5357
5358 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5359 if (snap_id == CEPH_NOSNAP)
5360 return -ENOENT;
5361
5362 spec->snap_id = snap_id;
5363 } else {
5364 spec->snap_id = CEPH_NOSNAP;
5365 }
5366
5367 return 0;
5368}
5369
5370/*
5371 * A parent image will have all ids but none of the names.
e1d4213f 5372 *
04077599
ID
5373 * All names in an rbd spec are dynamically allocated. It's OK if we
5374 * can't figure out the name for an image id.
9e15b77d 5375 */
04077599 5376static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5377{
2e9f7f1c
AE
5378 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5379 struct rbd_spec *spec = rbd_dev->spec;
5380 const char *pool_name;
5381 const char *image_name;
5382 const char *snap_name;
9e15b77d
AE
5383 int ret;
5384
04077599
ID
5385 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5386 rbd_assert(spec->image_id);
5387 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5388
2e9f7f1c 5389 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5390
2e9f7f1c
AE
5391 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5392 if (!pool_name) {
5393 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5394 return -EIO;
5395 }
2e9f7f1c
AE
5396 pool_name = kstrdup(pool_name, GFP_KERNEL);
5397 if (!pool_name)
9e15b77d
AE
5398 return -ENOMEM;
5399
5400 /* Fetch the image name; tolerate failure here */
5401
2e9f7f1c
AE
5402 image_name = rbd_dev_image_name(rbd_dev);
5403 if (!image_name)
06ecc6cb 5404 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5405
04077599 5406 /* Fetch the snapshot name */
9e15b77d 5407
2e9f7f1c 5408 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5409 if (IS_ERR(snap_name)) {
5410 ret = PTR_ERR(snap_name);
9e15b77d 5411 goto out_err;
2e9f7f1c
AE
5412 }
5413
5414 spec->pool_name = pool_name;
5415 spec->image_name = image_name;
5416 spec->snap_name = snap_name;
9e15b77d
AE
5417
5418 return 0;
04077599 5419
9e15b77d 5420out_err:
2e9f7f1c
AE
5421 kfree(image_name);
5422 kfree(pool_name);
9e15b77d
AE
5423 return ret;
5424}
5425
cc4a38bd 5426static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5427{
5428 size_t size;
5429 int ret;
5430 void *reply_buf;
5431 void *p;
5432 void *end;
5433 u64 seq;
5434 u32 snap_count;
5435 struct ceph_snap_context *snapc;
5436 u32 i;
5437
5438 /*
5439 * We'll need room for the seq value (maximum snapshot id),
5440 * snapshot count, and array of that many snapshot ids.
5441 * For now we have a fixed upper limit on the number we're
5442 * prepared to receive.
5443 */
5444 size = sizeof (__le64) + sizeof (__le32) +
5445 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5446 reply_buf = kzalloc(size, GFP_KERNEL);
5447 if (!reply_buf)
5448 return -ENOMEM;
5449
c41d13a3 5450 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
4157976b 5451 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 5452 reply_buf, size);
36be9a76 5453 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5454 if (ret < 0)
5455 goto out;
5456
35d489f9 5457 p = reply_buf;
57385b51
AE
5458 end = reply_buf + ret;
5459 ret = -ERANGE;
35d489f9
AE
5460 ceph_decode_64_safe(&p, end, seq, out);
5461 ceph_decode_32_safe(&p, end, snap_count, out);
5462
5463 /*
5464 * Make sure the reported number of snapshot ids wouldn't go
5465 * beyond the end of our buffer. But before checking that,
5466 * make sure the computed size of the snapshot context we
5467 * allocate is representable in a size_t.
5468 */
5469 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5470 / sizeof (u64)) {
5471 ret = -EINVAL;
5472 goto out;
5473 }
5474 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5475 goto out;
468521c1 5476 ret = 0;
35d489f9 5477
812164f8 5478 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5479 if (!snapc) {
5480 ret = -ENOMEM;
5481 goto out;
5482 }
35d489f9 5483 snapc->seq = seq;
35d489f9
AE
5484 for (i = 0; i < snap_count; i++)
5485 snapc->snaps[i] = ceph_decode_64(&p);
5486
49ece554 5487 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5488 rbd_dev->header.snapc = snapc;
5489
5490 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5491 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5492out:
5493 kfree(reply_buf);
5494
57385b51 5495 return ret;
35d489f9
AE
5496}
5497
54cac61f
AE
5498static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5499 u64 snap_id)
b8b1e2db
AE
5500{
5501 size_t size;
5502 void *reply_buf;
54cac61f 5503 __le64 snapid;
b8b1e2db
AE
5504 int ret;
5505 void *p;
5506 void *end;
b8b1e2db
AE
5507 char *snap_name;
5508
5509 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5510 reply_buf = kmalloc(size, GFP_KERNEL);
5511 if (!reply_buf)
5512 return ERR_PTR(-ENOMEM);
5513
54cac61f 5514 snapid = cpu_to_le64(snap_id);
c41d13a3 5515 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name,
b8b1e2db 5516 "rbd", "get_snapshot_name",
54cac61f 5517 &snapid, sizeof (snapid),
e2a58ee5 5518 reply_buf, size);
36be9a76 5519 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5520 if (ret < 0) {
5521 snap_name = ERR_PTR(ret);
b8b1e2db 5522 goto out;
f40eb349 5523 }
b8b1e2db
AE
5524
5525 p = reply_buf;
f40eb349 5526 end = reply_buf + ret;
e5c35534 5527 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5528 if (IS_ERR(snap_name))
b8b1e2db 5529 goto out;
b8b1e2db 5530
f40eb349 5531 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5532 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5533out:
5534 kfree(reply_buf);
5535
f40eb349 5536 return snap_name;
b8b1e2db
AE
5537}
5538
2df3fac7 5539static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5540{
2df3fac7 5541 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5542 int ret;
117973fb 5543
1617e40c
JD
5544 ret = rbd_dev_v2_image_size(rbd_dev);
5545 if (ret)
cfbf6377 5546 return ret;
1617e40c 5547
2df3fac7
AE
5548 if (first_time) {
5549 ret = rbd_dev_v2_header_onetime(rbd_dev);
5550 if (ret)
cfbf6377 5551 return ret;
2df3fac7
AE
5552 }
5553
cc4a38bd 5554 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5555 if (ret && first_time) {
5556 kfree(rbd_dev->header.object_prefix);
5557 rbd_dev->header.object_prefix = NULL;
5558 }
117973fb
AE
5559
5560 return ret;
5561}
5562
a720ae09
ID
5563static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5564{
5565 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5566
5567 if (rbd_dev->image_format == 1)
5568 return rbd_dev_v1_header_info(rbd_dev);
5569
5570 return rbd_dev_v2_header_info(rbd_dev);
5571}
5572
e28fff26
AE
5573/*
5574 * Skips over white space at *buf, and updates *buf to point to the
5575 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5576 * the token (string of non-white space characters) found. Note
5577 * that *buf must be terminated with '\0'.
e28fff26
AE
5578 */
5579static inline size_t next_token(const char **buf)
5580{
5581 /*
5582 * These are the characters that produce nonzero for
5583 * isspace() in the "C" and "POSIX" locales.
5584 */
5585 const char *spaces = " \f\n\r\t\v";
5586
5587 *buf += strspn(*buf, spaces); /* Find start of token */
5588
5589 return strcspn(*buf, spaces); /* Return token length */
5590}
5591
ea3352f4
AE
5592/*
5593 * Finds the next token in *buf, dynamically allocates a buffer big
5594 * enough to hold a copy of it, and copies the token into the new
5595 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5596 * that a duplicate buffer is created even for a zero-length token.
5597 *
5598 * Returns a pointer to the newly-allocated duplicate, or a null
5599 * pointer if memory for the duplicate was not available. If
5600 * the lenp argument is a non-null pointer, the length of the token
5601 * (not including the '\0') is returned in *lenp.
5602 *
5603 * If successful, the *buf pointer will be updated to point beyond
5604 * the end of the found token.
5605 *
5606 * Note: uses GFP_KERNEL for allocation.
5607 */
5608static inline char *dup_token(const char **buf, size_t *lenp)
5609{
5610 char *dup;
5611 size_t len;
5612
5613 len = next_token(buf);
4caf35f9 5614 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5615 if (!dup)
5616 return NULL;
ea3352f4
AE
5617 *(dup + len) = '\0';
5618 *buf += len;
5619
5620 if (lenp)
5621 *lenp = len;
5622
5623 return dup;
5624}
5625
a725f65e 5626/*
859c31df
AE
5627 * Parse the options provided for an "rbd add" (i.e., rbd image
5628 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5629 * and the data written is passed here via a NUL-terminated buffer.
5630 * Returns 0 if successful or an error code otherwise.
d22f76e7 5631 *
859c31df
AE
5632 * The information extracted from these options is recorded in
5633 * the other parameters which return dynamically-allocated
5634 * structures:
5635 * ceph_opts
5636 * The address of a pointer that will refer to a ceph options
5637 * structure. Caller must release the returned pointer using
5638 * ceph_destroy_options() when it is no longer needed.
5639 * rbd_opts
5640 * Address of an rbd options pointer. Fully initialized by
5641 * this function; caller must release with kfree().
5642 * spec
5643 * Address of an rbd image specification pointer. Fully
5644 * initialized by this function based on parsed options.
5645 * Caller must release with rbd_spec_put().
5646 *
5647 * The options passed take this form:
5648 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5649 * where:
5650 * <mon_addrs>
5651 * A comma-separated list of one or more monitor addresses.
5652 * A monitor address is an ip address, optionally followed
5653 * by a port number (separated by a colon).
5654 * I.e.: ip1[:port1][,ip2[:port2]...]
5655 * <options>
5656 * A comma-separated list of ceph and/or rbd options.
5657 * <pool_name>
5658 * The name of the rados pool containing the rbd image.
5659 * <image_name>
5660 * The name of the image in that pool to map.
5661 * <snap_id>
5662 * An optional snapshot id. If provided, the mapping will
5663 * present data from the image at the time that snapshot was
5664 * created. The image head is used if no snapshot id is
5665 * provided. Snapshot mappings are always read-only.
a725f65e 5666 */
859c31df 5667static int rbd_add_parse_args(const char *buf,
dc79b113 5668 struct ceph_options **ceph_opts,
859c31df
AE
5669 struct rbd_options **opts,
5670 struct rbd_spec **rbd_spec)
e28fff26 5671{
d22f76e7 5672 size_t len;
859c31df 5673 char *options;
0ddebc0c 5674 const char *mon_addrs;
ecb4dc22 5675 char *snap_name;
0ddebc0c 5676 size_t mon_addrs_size;
859c31df 5677 struct rbd_spec *spec = NULL;
4e9afeba 5678 struct rbd_options *rbd_opts = NULL;
859c31df 5679 struct ceph_options *copts;
dc79b113 5680 int ret;
e28fff26
AE
5681
5682 /* The first four tokens are required */
5683
7ef3214a 5684 len = next_token(&buf);
4fb5d671
AE
5685 if (!len) {
5686 rbd_warn(NULL, "no monitor address(es) provided");
5687 return -EINVAL;
5688 }
0ddebc0c 5689 mon_addrs = buf;
f28e565a 5690 mon_addrs_size = len + 1;
7ef3214a 5691 buf += len;
a725f65e 5692
dc79b113 5693 ret = -EINVAL;
f28e565a
AE
5694 options = dup_token(&buf, NULL);
5695 if (!options)
dc79b113 5696 return -ENOMEM;
4fb5d671
AE
5697 if (!*options) {
5698 rbd_warn(NULL, "no options provided");
5699 goto out_err;
5700 }
e28fff26 5701
859c31df
AE
5702 spec = rbd_spec_alloc();
5703 if (!spec)
f28e565a 5704 goto out_mem;
859c31df
AE
5705
5706 spec->pool_name = dup_token(&buf, NULL);
5707 if (!spec->pool_name)
5708 goto out_mem;
4fb5d671
AE
5709 if (!*spec->pool_name) {
5710 rbd_warn(NULL, "no pool name provided");
5711 goto out_err;
5712 }
e28fff26 5713
69e7a02f 5714 spec->image_name = dup_token(&buf, NULL);
859c31df 5715 if (!spec->image_name)
f28e565a 5716 goto out_mem;
4fb5d671
AE
5717 if (!*spec->image_name) {
5718 rbd_warn(NULL, "no image name provided");
5719 goto out_err;
5720 }
d4b125e9 5721
f28e565a
AE
5722 /*
5723 * Snapshot name is optional; default is to use "-"
5724 * (indicating the head/no snapshot).
5725 */
3feeb894 5726 len = next_token(&buf);
820a5f3e 5727 if (!len) {
3feeb894
AE
5728 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5729 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5730 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5731 ret = -ENAMETOOLONG;
f28e565a 5732 goto out_err;
849b4260 5733 }
ecb4dc22
AE
5734 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5735 if (!snap_name)
f28e565a 5736 goto out_mem;
ecb4dc22
AE
5737 *(snap_name + len) = '\0';
5738 spec->snap_name = snap_name;
e5c35534 5739
0ddebc0c 5740 /* Initialize all rbd options to the defaults */
e28fff26 5741
4e9afeba
AE
5742 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5743 if (!rbd_opts)
5744 goto out_mem;
5745
5746 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5747 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
d22f76e7 5748
859c31df 5749 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5750 mon_addrs + mon_addrs_size - 1,
4e9afeba 5751 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5752 if (IS_ERR(copts)) {
5753 ret = PTR_ERR(copts);
dc79b113
AE
5754 goto out_err;
5755 }
859c31df
AE
5756 kfree(options);
5757
5758 *ceph_opts = copts;
4e9afeba 5759 *opts = rbd_opts;
859c31df 5760 *rbd_spec = spec;
0ddebc0c 5761
dc79b113 5762 return 0;
f28e565a 5763out_mem:
dc79b113 5764 ret = -ENOMEM;
d22f76e7 5765out_err:
859c31df
AE
5766 kfree(rbd_opts);
5767 rbd_spec_put(spec);
f28e565a 5768 kfree(options);
d22f76e7 5769
dc79b113 5770 return ret;
a725f65e
AE
5771}
5772
30ba1f02
ID
5773/*
5774 * Return pool id (>= 0) or a negative error code.
5775 */
5776static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5777{
a319bf56 5778 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5779 u64 newest_epoch;
30ba1f02
ID
5780 int tries = 0;
5781 int ret;
5782
5783again:
5784 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5785 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5786 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5787 &newest_epoch);
30ba1f02
ID
5788 if (ret < 0)
5789 return ret;
5790
5791 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5792 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5793 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5794 newest_epoch,
5795 opts->mount_timeout);
30ba1f02
ID
5796 goto again;
5797 } else {
5798 /* the osdmap we have is new enough */
5799 return -ENOENT;
5800 }
5801 }
5802
5803 return ret;
5804}
5805
589d30e0
AE
5806/*
5807 * An rbd format 2 image has a unique identifier, distinct from the
5808 * name given to it by the user. Internally, that identifier is
5809 * what's used to specify the names of objects related to the image.
5810 *
5811 * A special "rbd id" object is used to map an rbd image name to its
5812 * id. If that object doesn't exist, then there is no v2 rbd image
5813 * with the supplied name.
5814 *
5815 * This function will record the given rbd_dev's image_id field if
5816 * it can be determined, and in that case will return 0. If any
5817 * errors occur a negative errno will be returned and the rbd_dev's
5818 * image_id field will be unchanged (and should be NULL).
5819 */
5820static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5821{
5822 int ret;
5823 size_t size;
5824 char *object_name;
5825 void *response;
c0fba368 5826 char *image_id;
2f82ee54 5827
2c0d0a10
AE
5828 /*
5829 * When probing a parent image, the image id is already
5830 * known (and the image name likely is not). There's no
c0fba368
AE
5831 * need to fetch the image id again in this case. We
5832 * do still need to set the image format though.
2c0d0a10 5833 */
c0fba368
AE
5834 if (rbd_dev->spec->image_id) {
5835 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5836
2c0d0a10 5837 return 0;
c0fba368 5838 }
2c0d0a10 5839
589d30e0
AE
5840 /*
5841 * First, see if the format 2 image id file exists, and if
5842 * so, get the image's persistent id from it.
5843 */
69e7a02f 5844 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
5845 object_name = kmalloc(size, GFP_NOIO);
5846 if (!object_name)
5847 return -ENOMEM;
0d7dbfce 5848 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
5849 dout("rbd id object name is %s\n", object_name);
5850
5851 /* Response will be an encoded string, which includes a length */
5852
5853 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5854 response = kzalloc(size, GFP_NOIO);
5855 if (!response) {
5856 ret = -ENOMEM;
5857 goto out;
5858 }
5859
c0fba368
AE
5860 /* If it doesn't exist we'll assume it's a format 1 image */
5861
36be9a76 5862 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 5863 "rbd", "get_id", NULL, 0,
e2a58ee5 5864 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5865 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5866 if (ret == -ENOENT) {
5867 image_id = kstrdup("", GFP_KERNEL);
5868 ret = image_id ? 0 : -ENOMEM;
5869 if (!ret)
5870 rbd_dev->image_format = 1;
7dd440c9 5871 } else if (ret >= 0) {
c0fba368
AE
5872 void *p = response;
5873
5874 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5875 NULL, GFP_NOIO);
461f758a 5876 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5877 if (!ret)
5878 rbd_dev->image_format = 2;
c0fba368
AE
5879 }
5880
5881 if (!ret) {
5882 rbd_dev->spec->image_id = image_id;
5883 dout("image_id is %s\n", image_id);
589d30e0
AE
5884 }
5885out:
5886 kfree(response);
5887 kfree(object_name);
5888
5889 return ret;
5890}
5891
3abef3b3
AE
5892/*
5893 * Undo whatever state changes are made by v1 or v2 header info
5894 * call.
5895 */
6fd48b3b
AE
5896static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5897{
5898 struct rbd_image_header *header;
5899
e69b8d41 5900 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5901
5902 /* Free dynamic fields from the header, then zero it out */
5903
5904 header = &rbd_dev->header;
812164f8 5905 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5906 kfree(header->snap_sizes);
5907 kfree(header->snap_names);
5908 kfree(header->object_prefix);
5909 memset(header, 0, sizeof (*header));
5910}
5911
2df3fac7 5912static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5913{
5914 int ret;
a30b71b9 5915
1e130199 5916 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5917 if (ret)
b1b5402a
AE
5918 goto out_err;
5919
2df3fac7
AE
5920 /*
5921 * Get the and check features for the image. Currently the
5922 * features are assumed to never change.
5923 */
b1b5402a 5924 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5925 if (ret)
9d475de5 5926 goto out_err;
35d489f9 5927
cc070d59
AE
5928 /* If the image supports fancy striping, get its parameters */
5929
5930 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5931 ret = rbd_dev_v2_striping_info(rbd_dev);
5932 if (ret < 0)
5933 goto out_err;
5934 }
2df3fac7 5935 /* No support for crypto and compression type format 2 images */
a30b71b9 5936
35152979 5937 return 0;
9d475de5 5938out_err:
642a2537 5939 rbd_dev->header.features = 0;
1e130199
AE
5940 kfree(rbd_dev->header.object_prefix);
5941 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
5942
5943 return ret;
a30b71b9
AE
5944}
5945
6d69bb53
ID
5946/*
5947 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5948 * rbd_dev_image_probe() recursion depth, which means it's also the
5949 * length of the already discovered part of the parent chain.
5950 */
5951static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5952{
2f82ee54 5953 struct rbd_device *parent = NULL;
124afba2
AE
5954 int ret;
5955
5956 if (!rbd_dev->parent_spec)
5957 return 0;
124afba2 5958
6d69bb53
ID
5959 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5960 pr_info("parent chain is too long (%d)\n", depth);
5961 ret = -EINVAL;
5962 goto out_err;
5963 }
5964
1643dfa4 5965 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5966 if (!parent) {
5967 ret = -ENOMEM;
124afba2 5968 goto out_err;
1f2c6651
ID
5969 }
5970
5971 /*
5972 * Images related by parent/child relationships always share
5973 * rbd_client and spec/parent_spec, so bump their refcounts.
5974 */
5975 __rbd_get_client(rbd_dev->rbd_client);
5976 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5977
6d69bb53 5978 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5979 if (ret < 0)
5980 goto out_err;
1f2c6651 5981
124afba2 5982 rbd_dev->parent = parent;
a2acd00e 5983 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5984 return 0;
1f2c6651 5985
124afba2 5986out_err:
1f2c6651 5987 rbd_dev_unparent(rbd_dev);
1761b229 5988 rbd_dev_destroy(parent);
124afba2
AE
5989 return ret;
5990}
5991
811c6688
ID
5992/*
5993 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5994 * upon return.
5995 */
200a6a8b 5996static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5997{
83a06263 5998 int ret;
d1cf5788 5999
9b60e70b 6000 /* Record our major and minor device numbers. */
83a06263 6001
9b60e70b
ID
6002 if (!single_major) {
6003 ret = register_blkdev(0, rbd_dev->name);
6004 if (ret < 0)
1643dfa4 6005 goto err_out_unlock;
9b60e70b
ID
6006
6007 rbd_dev->major = ret;
6008 rbd_dev->minor = 0;
6009 } else {
6010 rbd_dev->major = rbd_major;
6011 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6012 }
83a06263
AE
6013
6014 /* Set up the blkdev mapping. */
6015
6016 ret = rbd_init_disk(rbd_dev);
6017 if (ret)
6018 goto err_out_blkdev;
6019
f35a4dee 6020 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
6021 if (ret)
6022 goto err_out_disk;
bc1ecc65 6023
f35a4dee 6024 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 6025 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 6026
dd5ac32d
ID
6027 dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6028 ret = device_add(&rbd_dev->dev);
f35a4dee 6029 if (ret)
f5ee37bd 6030 goto err_out_mapping;
83a06263 6031
83a06263
AE
6032 /* Everything's ready. Announce the disk to the world. */
6033
129b79d4 6034 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 6035 up_write(&rbd_dev->header_rwsem);
83a06263 6036
1643dfa4
ID
6037 spin_lock(&rbd_dev_list_lock);
6038 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6039 spin_unlock(&rbd_dev_list_lock);
6040
811c6688 6041 add_disk(rbd_dev->disk);
ca7909e8
ID
6042 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6043 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6044 rbd_dev->header.features);
83a06263
AE
6045
6046 return ret;
2f82ee54 6047
f35a4dee
AE
6048err_out_mapping:
6049 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
6050err_out_disk:
6051 rbd_free_disk(rbd_dev);
6052err_out_blkdev:
9b60e70b
ID
6053 if (!single_major)
6054 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
6055err_out_unlock:
6056 up_write(&rbd_dev->header_rwsem);
83a06263
AE
6057 return ret;
6058}
6059
332bb12d
AE
6060static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6061{
6062 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 6063 int ret;
332bb12d
AE
6064
6065 /* Record the header object name for this rbd image. */
6066
6067 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6068
7627151e 6069 rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
332bb12d 6070 if (rbd_dev->image_format == 1)
c41d13a3
ID
6071 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6072 spec->image_name, RBD_SUFFIX);
332bb12d 6073 else
c41d13a3
ID
6074 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6075 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 6076
c41d13a3 6077 return ret;
332bb12d
AE
6078}
6079
200a6a8b
AE
6080static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6081{
6fd48b3b 6082 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
6083 rbd_dev->image_format = 0;
6084 kfree(rbd_dev->spec->image_id);
6085 rbd_dev->spec->image_id = NULL;
6086
200a6a8b
AE
6087 rbd_dev_destroy(rbd_dev);
6088}
6089
a30b71b9
AE
6090/*
6091 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6092 * device. If this image is the one being mapped (i.e., not a
6093 * parent), initiate a watch on its header object before using that
6094 * object to get detailed information about the rbd image.
a30b71b9 6095 */
6d69bb53 6096static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6097{
6098 int ret;
6099
6100 /*
3abef3b3
AE
6101 * Get the id from the image id object. Unless there's an
6102 * error, rbd_dev->spec->image_id will be filled in with
6103 * a dynamically-allocated string, and rbd_dev->image_format
6104 * will be set to either 1 or 2.
a30b71b9
AE
6105 */
6106 ret = rbd_dev_image_id(rbd_dev);
6107 if (ret)
c0fba368 6108 return ret;
c0fba368 6109
332bb12d
AE
6110 ret = rbd_dev_header_name(rbd_dev);
6111 if (ret)
6112 goto err_out_format;
6113
6d69bb53 6114 if (!depth) {
99d16943 6115 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6116 if (ret) {
6117 if (ret == -ENOENT)
6118 pr_info("image %s/%s does not exist\n",
6119 rbd_dev->spec->pool_name,
6120 rbd_dev->spec->image_name);
c41d13a3 6121 goto err_out_format;
1fe48023 6122 }
1f3ef788 6123 }
b644de2b 6124
a720ae09 6125 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6126 if (ret)
b644de2b 6127 goto err_out_watch;
83a06263 6128
04077599
ID
6129 /*
6130 * If this image is the one being mapped, we have pool name and
6131 * id, image name and id, and snap name - need to fill snap id.
6132 * Otherwise this is a parent image, identified by pool, image
6133 * and snap ids - need to fill in names for those ids.
6134 */
6d69bb53 6135 if (!depth)
04077599
ID
6136 ret = rbd_spec_fill_snap_id(rbd_dev);
6137 else
6138 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6139 if (ret) {
6140 if (ret == -ENOENT)
6141 pr_info("snap %s/%s@%s does not exist\n",
6142 rbd_dev->spec->pool_name,
6143 rbd_dev->spec->image_name,
6144 rbd_dev->spec->snap_name);
33dca39f 6145 goto err_out_probe;
1fe48023 6146 }
9bb81c9b 6147
e8f59b59
ID
6148 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6149 ret = rbd_dev_v2_parent_info(rbd_dev);
6150 if (ret)
6151 goto err_out_probe;
6152
6153 /*
6154 * Need to warn users if this image is the one being
6155 * mapped and has a parent.
6156 */
6d69bb53 6157 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6158 rbd_warn(rbd_dev,
6159 "WARNING: kernel layering is EXPERIMENTAL!");
6160 }
6161
6d69bb53 6162 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6163 if (ret)
6164 goto err_out_probe;
6165
6166 dout("discovered format %u image, header name is %s\n",
c41d13a3 6167 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6168 return 0;
e8f59b59 6169
6fd48b3b
AE
6170err_out_probe:
6171 rbd_dev_unprobe(rbd_dev);
b644de2b 6172err_out_watch:
6d69bb53 6173 if (!depth)
99d16943 6174 rbd_unregister_watch(rbd_dev);
332bb12d
AE
6175err_out_format:
6176 rbd_dev->image_format = 0;
5655c4d9
AE
6177 kfree(rbd_dev->spec->image_id);
6178 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6179 return ret;
6180}
6181
9b60e70b
ID
6182static ssize_t do_rbd_add(struct bus_type *bus,
6183 const char *buf,
6184 size_t count)
602adf40 6185{
cb8627c7 6186 struct rbd_device *rbd_dev = NULL;
dc79b113 6187 struct ceph_options *ceph_opts = NULL;
4e9afeba 6188 struct rbd_options *rbd_opts = NULL;
859c31df 6189 struct rbd_spec *spec = NULL;
9d3997fd 6190 struct rbd_client *rbdc;
51344a38 6191 bool read_only;
b51c83c2 6192 int rc;
602adf40
YS
6193
6194 if (!try_module_get(THIS_MODULE))
6195 return -ENODEV;
6196
602adf40 6197 /* parse add command */
859c31df 6198 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6199 if (rc < 0)
dd5ac32d 6200 goto out;
78cea76e 6201
9d3997fd
AE
6202 rbdc = rbd_get_client(ceph_opts);
6203 if (IS_ERR(rbdc)) {
6204 rc = PTR_ERR(rbdc);
0ddebc0c 6205 goto err_out_args;
9d3997fd 6206 }
602adf40 6207
602adf40 6208 /* pick the pool */
30ba1f02 6209 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6210 if (rc < 0) {
6211 if (rc == -ENOENT)
6212 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6213 goto err_out_client;
1fe48023 6214 }
c0cd10db 6215 spec->pool_id = (u64)rc;
859c31df 6216
d147543d 6217 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6218 if (!rbd_dev) {
6219 rc = -ENOMEM;
bd4ba655 6220 goto err_out_client;
b51c83c2 6221 }
c53d5893
AE
6222 rbdc = NULL; /* rbd_dev now owns this */
6223 spec = NULL; /* rbd_dev now owns this */
d147543d 6224 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6225
811c6688 6226 down_write(&rbd_dev->header_rwsem);
6d69bb53 6227 rc = rbd_dev_image_probe(rbd_dev, 0);
a30b71b9 6228 if (rc < 0)
c53d5893 6229 goto err_out_rbd_dev;
05fd6f6f 6230
7ce4eef7
AE
6231 /* If we are mapping a snapshot it must be marked read-only */
6232
d147543d 6233 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6234 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6235 read_only = true;
6236 rbd_dev->mapping.read_only = read_only;
6237
b536f69a 6238 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3 6239 if (rc) {
e37180c0 6240 /*
99d16943 6241 * rbd_unregister_watch() can't be moved into
e37180c0
ID
6242 * rbd_dev_image_release() without refactoring, see
6243 * commit 1f3ef78861ac.
6244 */
99d16943 6245 rbd_unregister_watch(rbd_dev);
3abef3b3 6246 rbd_dev_image_release(rbd_dev);
dd5ac32d 6247 goto out;
3abef3b3
AE
6248 }
6249
dd5ac32d
ID
6250 rc = count;
6251out:
6252 module_put(THIS_MODULE);
6253 return rc;
b536f69a 6254
c53d5893 6255err_out_rbd_dev:
811c6688 6256 up_write(&rbd_dev->header_rwsem);
c53d5893 6257 rbd_dev_destroy(rbd_dev);
bd4ba655 6258err_out_client:
9d3997fd 6259 rbd_put_client(rbdc);
0ddebc0c 6260err_out_args:
859c31df 6261 rbd_spec_put(spec);
d147543d 6262 kfree(rbd_opts);
dd5ac32d 6263 goto out;
602adf40
YS
6264}
6265
9b60e70b
ID
6266static ssize_t rbd_add(struct bus_type *bus,
6267 const char *buf,
6268 size_t count)
6269{
6270 if (single_major)
6271 return -EINVAL;
6272
6273 return do_rbd_add(bus, buf, count);
6274}
6275
6276static ssize_t rbd_add_single_major(struct bus_type *bus,
6277 const char *buf,
6278 size_t count)
6279{
6280 return do_rbd_add(bus, buf, count);
6281}
6282
dd5ac32d 6283static void rbd_dev_device_release(struct rbd_device *rbd_dev)
602adf40 6284{
602adf40 6285 rbd_free_disk(rbd_dev);
1643dfa4
ID
6286
6287 spin_lock(&rbd_dev_list_lock);
6288 list_del_init(&rbd_dev->node);
6289 spin_unlock(&rbd_dev_list_lock);
6290
200a6a8b 6291 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
dd5ac32d 6292 device_del(&rbd_dev->dev);
6d80b130 6293 rbd_dev_mapping_clear(rbd_dev);
9b60e70b
ID
6294 if (!single_major)
6295 unregister_blkdev(rbd_dev->major, rbd_dev->name);
602adf40
YS
6296}
6297
05a46afd
AE
6298static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6299{
ad945fc1 6300 while (rbd_dev->parent) {
05a46afd
AE
6301 struct rbd_device *first = rbd_dev;
6302 struct rbd_device *second = first->parent;
6303 struct rbd_device *third;
6304
6305 /*
6306 * Follow to the parent with no grandparent and
6307 * remove it.
6308 */
6309 while (second && (third = second->parent)) {
6310 first = second;
6311 second = third;
6312 }
ad945fc1 6313 rbd_assert(second);
8ad42cd0 6314 rbd_dev_image_release(second);
ad945fc1
AE
6315 first->parent = NULL;
6316 first->parent_overlap = 0;
6317
6318 rbd_assert(first->parent_spec);
05a46afd
AE
6319 rbd_spec_put(first->parent_spec);
6320 first->parent_spec = NULL;
05a46afd
AE
6321 }
6322}
6323
9b60e70b
ID
6324static ssize_t do_rbd_remove(struct bus_type *bus,
6325 const char *buf,
6326 size_t count)
602adf40
YS
6327{
6328 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6329 struct list_head *tmp;
6330 int dev_id;
602adf40 6331 unsigned long ul;
82a442d2 6332 bool already = false;
0d8189e1 6333 int ret;
602adf40 6334
bb8e0e84 6335 ret = kstrtoul(buf, 10, &ul);
0d8189e1
AE
6336 if (ret)
6337 return ret;
602adf40
YS
6338
6339 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
6340 dev_id = (int)ul;
6341 if (dev_id != ul)
602adf40
YS
6342 return -EINVAL;
6343
751cc0e3
AE
6344 ret = -ENOENT;
6345 spin_lock(&rbd_dev_list_lock);
6346 list_for_each(tmp, &rbd_dev_list) {
6347 rbd_dev = list_entry(tmp, struct rbd_device, node);
6348 if (rbd_dev->dev_id == dev_id) {
6349 ret = 0;
6350 break;
6351 }
42382b70 6352 }
751cc0e3
AE
6353 if (!ret) {
6354 spin_lock_irq(&rbd_dev->lock);
6355 if (rbd_dev->open_count)
6356 ret = -EBUSY;
6357 else
82a442d2
AE
6358 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6359 &rbd_dev->flags);
751cc0e3
AE
6360 spin_unlock_irq(&rbd_dev->lock);
6361 }
6362 spin_unlock(&rbd_dev_list_lock);
82a442d2 6363 if (ret < 0 || already)
1ba0f1e7 6364 return ret;
751cc0e3 6365
ed95b21a
ID
6366 down_write(&rbd_dev->lock_rwsem);
6367 if (__rbd_is_lock_owner(rbd_dev))
6368 rbd_unlock(rbd_dev);
6369 up_write(&rbd_dev->lock_rwsem);
99d16943 6370 rbd_unregister_watch(rbd_dev);
fca27065 6371
9875201e
JD
6372 /*
6373 * Don't free anything from rbd_dev->disk until after all
6374 * notifies are completely processed. Otherwise
6375 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6376 * in a potential use after free of rbd_dev->disk or rbd_dev.
6377 */
dd5ac32d 6378 rbd_dev_device_release(rbd_dev);
8ad42cd0 6379 rbd_dev_image_release(rbd_dev);
aafb230e 6380
1ba0f1e7 6381 return count;
602adf40
YS
6382}
6383
9b60e70b
ID
6384static ssize_t rbd_remove(struct bus_type *bus,
6385 const char *buf,
6386 size_t count)
6387{
6388 if (single_major)
6389 return -EINVAL;
6390
6391 return do_rbd_remove(bus, buf, count);
6392}
6393
6394static ssize_t rbd_remove_single_major(struct bus_type *bus,
6395 const char *buf,
6396 size_t count)
6397{
6398 return do_rbd_remove(bus, buf, count);
6399}
6400
602adf40
YS
6401/*
6402 * create control files in sysfs
dfc5606d 6403 * /sys/bus/rbd/...
602adf40
YS
6404 */
6405static int rbd_sysfs_init(void)
6406{
dfc5606d 6407 int ret;
602adf40 6408
fed4c143 6409 ret = device_register(&rbd_root_dev);
21079786 6410 if (ret < 0)
dfc5606d 6411 return ret;
602adf40 6412
fed4c143
AE
6413 ret = bus_register(&rbd_bus_type);
6414 if (ret < 0)
6415 device_unregister(&rbd_root_dev);
602adf40 6416
602adf40
YS
6417 return ret;
6418}
6419
6420static void rbd_sysfs_cleanup(void)
6421{
dfc5606d 6422 bus_unregister(&rbd_bus_type);
fed4c143 6423 device_unregister(&rbd_root_dev);
602adf40
YS
6424}
6425
1c2a9dfe
AE
6426static int rbd_slab_init(void)
6427{
6428 rbd_assert(!rbd_img_request_cache);
03d94406 6429 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6430 if (!rbd_img_request_cache)
6431 return -ENOMEM;
6432
6433 rbd_assert(!rbd_obj_request_cache);
03d94406 6434 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6435 if (!rbd_obj_request_cache)
6436 goto out_err;
6437
6438 rbd_assert(!rbd_segment_name_cache);
6439 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
2d0ebc5d 6440 CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
78c2a44a 6441 if (rbd_segment_name_cache)
1c2a9dfe 6442 return 0;
78c2a44a 6443out_err:
13bf2834
JL
6444 kmem_cache_destroy(rbd_obj_request_cache);
6445 rbd_obj_request_cache = NULL;
1c2a9dfe 6446
868311b1
AE
6447 kmem_cache_destroy(rbd_img_request_cache);
6448 rbd_img_request_cache = NULL;
6449
1c2a9dfe
AE
6450 return -ENOMEM;
6451}
6452
6453static void rbd_slab_exit(void)
6454{
78c2a44a
AE
6455 rbd_assert(rbd_segment_name_cache);
6456 kmem_cache_destroy(rbd_segment_name_cache);
6457 rbd_segment_name_cache = NULL;
6458
868311b1
AE
6459 rbd_assert(rbd_obj_request_cache);
6460 kmem_cache_destroy(rbd_obj_request_cache);
6461 rbd_obj_request_cache = NULL;
6462
1c2a9dfe
AE
6463 rbd_assert(rbd_img_request_cache);
6464 kmem_cache_destroy(rbd_img_request_cache);
6465 rbd_img_request_cache = NULL;
6466}
6467
cc344fa1 6468static int __init rbd_init(void)
602adf40
YS
6469{
6470 int rc;
6471
1e32d34c
AE
6472 if (!libceph_compatible(NULL)) {
6473 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6474 return -EINVAL;
6475 }
e1b4d96d 6476
1c2a9dfe 6477 rc = rbd_slab_init();
602adf40
YS
6478 if (rc)
6479 return rc;
e1b4d96d 6480
f5ee37bd
ID
6481 /*
6482 * The number of active work items is limited by the number of
f77303bd 6483 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6484 */
6485 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6486 if (!rbd_wq) {
6487 rc = -ENOMEM;
6488 goto err_out_slab;
6489 }
6490
9b60e70b
ID
6491 if (single_major) {
6492 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6493 if (rbd_major < 0) {
6494 rc = rbd_major;
f5ee37bd 6495 goto err_out_wq;
9b60e70b
ID
6496 }
6497 }
6498
1c2a9dfe
AE
6499 rc = rbd_sysfs_init();
6500 if (rc)
9b60e70b
ID
6501 goto err_out_blkdev;
6502
6503 if (single_major)
6504 pr_info("loaded (major %d)\n", rbd_major);
6505 else
6506 pr_info("loaded\n");
1c2a9dfe 6507
e1b4d96d
ID
6508 return 0;
6509
9b60e70b
ID
6510err_out_blkdev:
6511 if (single_major)
6512 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6513err_out_wq:
6514 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6515err_out_slab:
6516 rbd_slab_exit();
1c2a9dfe 6517 return rc;
602adf40
YS
6518}
6519
cc344fa1 6520static void __exit rbd_exit(void)
602adf40 6521{
ffe312cf 6522 ida_destroy(&rbd_dev_id_ida);
602adf40 6523 rbd_sysfs_cleanup();
9b60e70b
ID
6524 if (single_major)
6525 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6526 destroy_workqueue(rbd_wq);
1c2a9dfe 6527 rbd_slab_exit();
602adf40
YS
6528}
6529
6530module_init(rbd_init);
6531module_exit(rbd_exit);
6532
d552c619 6533MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6534MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6535MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6536/* following authorship retained from original osdblk.c */
6537MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6538
90da258b 6539MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6540MODULE_LICENSE("GPL");