]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
Merge tag 'iommu-updates-v4.13' of git://git.kernel.org/pub/scm/linux/kernel/git...
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1
2 /*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25 For usage instructions, please refer to:
26
27 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG /* Activate rbd_assert() calls */
52
53 /*
54 * The basic unit of block I/O is a sector. It is interpreted in a
55 * number of contexts in Linux (blk, bio, genhd), but the default is
56 * universally 512 bytes. These symbols are just slightly more
57 * meaningful than the bare numbers they represent.
58 */
59 #define SECTOR_SHIFT 9
60 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
61
62 /*
63 * Increment the given counter and return its updated value.
64 * If the counter is already 0 it will not be incremented.
65 * If the counter is already at its maximum value returns
66 * -EINVAL without updating it.
67 */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70 unsigned int counter;
71
72 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73 if (counter <= (unsigned int)INT_MAX)
74 return (int)counter;
75
76 atomic_dec(v);
77
78 return -EINVAL;
79 }
80
81 /* Decrement the counter. Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84 int counter;
85
86 counter = atomic_dec_return(v);
87 if (counter >= 0)
88 return counter;
89
90 atomic_inc(v);
91
92 return -EINVAL;
93 }
94
95 #define RBD_DRV_NAME "rbd"
96
97 #define RBD_MINORS_PER_MAJOR 256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
99
100 #define RBD_MAX_PARENT_CHAIN_LEN 16
101
102 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN \
104 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
107
108 #define RBD_SNAP_HEAD_NAME "-"
109
110 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
111
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX 64
115
116 #define RBD_OBJ_PREFIX_LEN_MAX 64
117
118 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
119 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
120
121 /* Feature bits */
122
123 #define RBD_FEATURE_LAYERING (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
127
128 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
129 RBD_FEATURE_STRIPINGV2 | \
130 RBD_FEATURE_EXCLUSIVE_LOCK | \
131 RBD_FEATURE_DATA_POOL)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
136
137 /*
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
140 */
141 #define DEV_NAME_LEN 32
142
143 /*
144 * block device image metadata (in-memory version)
145 */
146 struct rbd_image_header {
147 /* These six fields never change for a given rbd image */
148 char *object_prefix;
149 __u8 obj_order;
150 u64 stripe_unit;
151 u64 stripe_count;
152 s64 data_pool_id;
153 u64 features; /* Might be changeable someday? */
154
155 /* The remaining fields need to be updated occasionally */
156 u64 image_size;
157 struct ceph_snap_context *snapc;
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
160 };
161
162 /*
163 * An rbd image specification.
164 *
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
168 *
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
173 *
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
179 *
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
183 *
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
186 */
187 struct rbd_spec {
188 u64 pool_id;
189 const char *pool_name;
190
191 const char *image_id;
192 const char *image_name;
193
194 u64 snap_id;
195 const char *snap_name;
196
197 struct kref kref;
198 };
199
200 /*
201 * an instance of the client. multiple devices may share an rbd client.
202 */
203 struct rbd_client {
204 struct ceph_client *client;
205 struct kref kref;
206 struct list_head node;
207 };
208
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
213
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
217 enum obj_request_type {
218 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220
221 enum obj_operation_type {
222 OBJ_OP_WRITE,
223 OBJ_OP_READ,
224 OBJ_OP_DISCARD,
225 };
226
227 enum obj_req_flags {
228 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
229 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
230 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
231 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
232 };
233
234 struct rbd_obj_request {
235 u64 object_no;
236 u64 offset; /* object start byte */
237 u64 length; /* bytes from offset */
238 unsigned long flags;
239
240 /*
241 * An object request associated with an image will have its
242 * img_data flag set; a standalone object request will not.
243 *
244 * A standalone object request will have which == BAD_WHICH
245 * and a null obj_request pointer.
246 *
247 * An object request initiated in support of a layered image
248 * object (to check for its existence before a write) will
249 * have which == BAD_WHICH and a non-null obj_request pointer.
250 *
251 * Finally, an object request for rbd image data will have
252 * which != BAD_WHICH, and will have a non-null img_request
253 * pointer. The value of which will be in the range
254 * 0..(img_request->obj_request_count-1).
255 */
256 union {
257 struct rbd_obj_request *obj_request; /* STAT op */
258 struct {
259 struct rbd_img_request *img_request;
260 u64 img_offset;
261 /* links for img_request->obj_requests list */
262 struct list_head links;
263 };
264 };
265 u32 which; /* posn image request list */
266
267 enum obj_request_type type;
268 union {
269 struct bio *bio_list;
270 struct {
271 struct page **pages;
272 u32 page_count;
273 };
274 };
275 struct page **copyup_pages;
276 u32 copyup_page_count;
277
278 struct ceph_osd_request *osd_req;
279
280 u64 xferred; /* bytes transferred */
281 int result;
282
283 rbd_obj_callback_t callback;
284 struct completion completion;
285
286 struct kref kref;
287 };
288
289 enum img_req_flags {
290 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
291 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
292 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
293 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
294 };
295
296 struct rbd_img_request {
297 struct rbd_device *rbd_dev;
298 u64 offset; /* starting image byte offset */
299 u64 length; /* byte count from offset */
300 unsigned long flags;
301 union {
302 u64 snap_id; /* for reads */
303 struct ceph_snap_context *snapc; /* for writes */
304 };
305 union {
306 struct request *rq; /* block request */
307 struct rbd_obj_request *obj_request; /* obj req initiator */
308 };
309 struct page **copyup_pages;
310 u32 copyup_page_count;
311 spinlock_t completion_lock;/* protects next_completion */
312 u32 next_completion;
313 rbd_img_callback_t callback;
314 u64 xferred;/* aggregate bytes transferred */
315 int result; /* first nonzero obj_request result */
316
317 u32 obj_request_count;
318 struct list_head obj_requests; /* rbd_obj_request structs */
319
320 struct kref kref;
321 };
322
323 #define for_each_obj_request(ireq, oreq) \
324 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329
330 enum rbd_watch_state {
331 RBD_WATCH_STATE_UNREGISTERED,
332 RBD_WATCH_STATE_REGISTERED,
333 RBD_WATCH_STATE_ERROR,
334 };
335
336 enum rbd_lock_state {
337 RBD_LOCK_STATE_UNLOCKED,
338 RBD_LOCK_STATE_LOCKED,
339 RBD_LOCK_STATE_RELEASING,
340 };
341
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344 u64 gid;
345 u64 handle;
346 };
347
348 struct rbd_mapping {
349 u64 size;
350 u64 features;
351 bool read_only;
352 };
353
354 /*
355 * a single device
356 */
357 struct rbd_device {
358 int dev_id; /* blkdev unique id */
359
360 int major; /* blkdev assigned major */
361 int minor;
362 struct gendisk *disk; /* blkdev's gendisk and rq */
363
364 u32 image_format; /* Either 1 or 2 */
365 struct rbd_client *rbd_client;
366
367 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368
369 spinlock_t lock; /* queue, flags, open_count */
370
371 struct rbd_image_header header;
372 unsigned long flags; /* possibly lock protected */
373 struct rbd_spec *spec;
374 struct rbd_options *opts;
375 char *config_info; /* add{,_single_major} string */
376
377 struct ceph_object_id header_oid;
378 struct ceph_object_locator header_oloc;
379
380 struct ceph_file_layout layout; /* used for all rbd requests */
381
382 struct mutex watch_mutex;
383 enum rbd_watch_state watch_state;
384 struct ceph_osd_linger_request *watch_handle;
385 u64 watch_cookie;
386 struct delayed_work watch_dwork;
387
388 struct rw_semaphore lock_rwsem;
389 enum rbd_lock_state lock_state;
390 char lock_cookie[32];
391 struct rbd_client_id owner_cid;
392 struct work_struct acquired_lock_work;
393 struct work_struct released_lock_work;
394 struct delayed_work lock_dwork;
395 struct work_struct unlock_work;
396 wait_queue_head_t lock_waitq;
397
398 struct workqueue_struct *task_wq;
399
400 struct rbd_spec *parent_spec;
401 u64 parent_overlap;
402 atomic_t parent_ref;
403 struct rbd_device *parent;
404
405 /* Block layer tags. */
406 struct blk_mq_tag_set tag_set;
407
408 /* protects updating the header */
409 struct rw_semaphore header_rwsem;
410
411 struct rbd_mapping mapping;
412
413 struct list_head node;
414
415 /* sysfs related */
416 struct device dev;
417 unsigned long open_count; /* protected by lock */
418 };
419
420 /*
421 * Flag bits for rbd_dev->flags:
422 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
423 * by rbd_dev->lock
424 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
425 */
426 enum rbd_dev_flags {
427 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
428 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
429 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
430 };
431
432 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
433
434 static LIST_HEAD(rbd_dev_list); /* devices */
435 static DEFINE_SPINLOCK(rbd_dev_list_lock);
436
437 static LIST_HEAD(rbd_client_list); /* clients */
438 static DEFINE_SPINLOCK(rbd_client_list_lock);
439
440 /* Slab caches for frequently-allocated structures */
441
442 static struct kmem_cache *rbd_img_request_cache;
443 static struct kmem_cache *rbd_obj_request_cache;
444
445 static struct bio_set *rbd_bio_clone;
446
447 static int rbd_major;
448 static DEFINE_IDA(rbd_dev_id_ida);
449
450 static struct workqueue_struct *rbd_wq;
451
452 /*
453 * Default to false for now, as single-major requires >= 0.75 version of
454 * userspace rbd utility.
455 */
456 static bool single_major = false;
457 module_param(single_major, bool, S_IRUGO);
458 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
459
460 static int rbd_img_request_submit(struct rbd_img_request *img_request);
461
462 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
463 size_t count);
464 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
465 size_t count);
466 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
467 size_t count);
468 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
469 size_t count);
470 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
471 static void rbd_spec_put(struct rbd_spec *spec);
472
473 static int rbd_dev_id_to_minor(int dev_id)
474 {
475 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
476 }
477
478 static int minor_to_rbd_dev_id(int minor)
479 {
480 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
481 }
482
483 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
484 {
485 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
486 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
487 }
488
489 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
490 {
491 bool is_lock_owner;
492
493 down_read(&rbd_dev->lock_rwsem);
494 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
495 up_read(&rbd_dev->lock_rwsem);
496 return is_lock_owner;
497 }
498
499 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
500 {
501 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
502 }
503
504 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
505 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
506 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
507 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
508 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
509
510 static struct attribute *rbd_bus_attrs[] = {
511 &bus_attr_add.attr,
512 &bus_attr_remove.attr,
513 &bus_attr_add_single_major.attr,
514 &bus_attr_remove_single_major.attr,
515 &bus_attr_supported_features.attr,
516 NULL,
517 };
518
519 static umode_t rbd_bus_is_visible(struct kobject *kobj,
520 struct attribute *attr, int index)
521 {
522 if (!single_major &&
523 (attr == &bus_attr_add_single_major.attr ||
524 attr == &bus_attr_remove_single_major.attr))
525 return 0;
526
527 return attr->mode;
528 }
529
530 static const struct attribute_group rbd_bus_group = {
531 .attrs = rbd_bus_attrs,
532 .is_visible = rbd_bus_is_visible,
533 };
534 __ATTRIBUTE_GROUPS(rbd_bus);
535
536 static struct bus_type rbd_bus_type = {
537 .name = "rbd",
538 .bus_groups = rbd_bus_groups,
539 };
540
541 static void rbd_root_dev_release(struct device *dev)
542 {
543 }
544
545 static struct device rbd_root_dev = {
546 .init_name = "rbd",
547 .release = rbd_root_dev_release,
548 };
549
550 static __printf(2, 3)
551 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
552 {
553 struct va_format vaf;
554 va_list args;
555
556 va_start(args, fmt);
557 vaf.fmt = fmt;
558 vaf.va = &args;
559
560 if (!rbd_dev)
561 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
562 else if (rbd_dev->disk)
563 printk(KERN_WARNING "%s: %s: %pV\n",
564 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
565 else if (rbd_dev->spec && rbd_dev->spec->image_name)
566 printk(KERN_WARNING "%s: image %s: %pV\n",
567 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
568 else if (rbd_dev->spec && rbd_dev->spec->image_id)
569 printk(KERN_WARNING "%s: id %s: %pV\n",
570 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
571 else /* punt */
572 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
573 RBD_DRV_NAME, rbd_dev, &vaf);
574 va_end(args);
575 }
576
577 #ifdef RBD_DEBUG
578 #define rbd_assert(expr) \
579 if (unlikely(!(expr))) { \
580 printk(KERN_ERR "\nAssertion failure in %s() " \
581 "at line %d:\n\n" \
582 "\trbd_assert(%s);\n\n", \
583 __func__, __LINE__, #expr); \
584 BUG(); \
585 }
586 #else /* !RBD_DEBUG */
587 # define rbd_assert(expr) ((void) 0)
588 #endif /* !RBD_DEBUG */
589
590 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
591 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
592 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
593 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
594
595 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
596 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
597 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
598 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
599 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
600 u64 snap_id);
601 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
602 u8 *order, u64 *snap_size);
603 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
604 u64 *snap_features);
605
606 static int rbd_open(struct block_device *bdev, fmode_t mode)
607 {
608 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
609 bool removing = false;
610
611 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
612 return -EROFS;
613
614 spin_lock_irq(&rbd_dev->lock);
615 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616 removing = true;
617 else
618 rbd_dev->open_count++;
619 spin_unlock_irq(&rbd_dev->lock);
620 if (removing)
621 return -ENOENT;
622
623 (void) get_device(&rbd_dev->dev);
624
625 return 0;
626 }
627
628 static void rbd_release(struct gendisk *disk, fmode_t mode)
629 {
630 struct rbd_device *rbd_dev = disk->private_data;
631 unsigned long open_count_before;
632
633 spin_lock_irq(&rbd_dev->lock);
634 open_count_before = rbd_dev->open_count--;
635 spin_unlock_irq(&rbd_dev->lock);
636 rbd_assert(open_count_before > 0);
637
638 put_device(&rbd_dev->dev);
639 }
640
641 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
642 {
643 int ret = 0;
644 int val;
645 bool ro;
646 bool ro_changed = false;
647
648 /* get_user() may sleep, so call it before taking rbd_dev->lock */
649 if (get_user(val, (int __user *)(arg)))
650 return -EFAULT;
651
652 ro = val ? true : false;
653 /* Snapshot doesn't allow to write*/
654 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
655 return -EROFS;
656
657 spin_lock_irq(&rbd_dev->lock);
658 /* prevent others open this device */
659 if (rbd_dev->open_count > 1) {
660 ret = -EBUSY;
661 goto out;
662 }
663
664 if (rbd_dev->mapping.read_only != ro) {
665 rbd_dev->mapping.read_only = ro;
666 ro_changed = true;
667 }
668
669 out:
670 spin_unlock_irq(&rbd_dev->lock);
671 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
672 if (ret == 0 && ro_changed)
673 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
674
675 return ret;
676 }
677
678 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
679 unsigned int cmd, unsigned long arg)
680 {
681 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
682 int ret = 0;
683
684 switch (cmd) {
685 case BLKROSET:
686 ret = rbd_ioctl_set_ro(rbd_dev, arg);
687 break;
688 default:
689 ret = -ENOTTY;
690 }
691
692 return ret;
693 }
694
695 #ifdef CONFIG_COMPAT
696 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
697 unsigned int cmd, unsigned long arg)
698 {
699 return rbd_ioctl(bdev, mode, cmd, arg);
700 }
701 #endif /* CONFIG_COMPAT */
702
703 static const struct block_device_operations rbd_bd_ops = {
704 .owner = THIS_MODULE,
705 .open = rbd_open,
706 .release = rbd_release,
707 .ioctl = rbd_ioctl,
708 #ifdef CONFIG_COMPAT
709 .compat_ioctl = rbd_compat_ioctl,
710 #endif
711 };
712
713 /*
714 * Initialize an rbd client instance. Success or not, this function
715 * consumes ceph_opts. Caller holds client_mutex.
716 */
717 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
718 {
719 struct rbd_client *rbdc;
720 int ret = -ENOMEM;
721
722 dout("%s:\n", __func__);
723 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
724 if (!rbdc)
725 goto out_opt;
726
727 kref_init(&rbdc->kref);
728 INIT_LIST_HEAD(&rbdc->node);
729
730 rbdc->client = ceph_create_client(ceph_opts, rbdc);
731 if (IS_ERR(rbdc->client))
732 goto out_rbdc;
733 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
734
735 ret = ceph_open_session(rbdc->client);
736 if (ret < 0)
737 goto out_client;
738
739 spin_lock(&rbd_client_list_lock);
740 list_add_tail(&rbdc->node, &rbd_client_list);
741 spin_unlock(&rbd_client_list_lock);
742
743 dout("%s: rbdc %p\n", __func__, rbdc);
744
745 return rbdc;
746 out_client:
747 ceph_destroy_client(rbdc->client);
748 out_rbdc:
749 kfree(rbdc);
750 out_opt:
751 if (ceph_opts)
752 ceph_destroy_options(ceph_opts);
753 dout("%s: error %d\n", __func__, ret);
754
755 return ERR_PTR(ret);
756 }
757
758 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
759 {
760 kref_get(&rbdc->kref);
761
762 return rbdc;
763 }
764
765 /*
766 * Find a ceph client with specific addr and configuration. If
767 * found, bump its reference count.
768 */
769 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
770 {
771 struct rbd_client *client_node;
772 bool found = false;
773
774 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
775 return NULL;
776
777 spin_lock(&rbd_client_list_lock);
778 list_for_each_entry(client_node, &rbd_client_list, node) {
779 if (!ceph_compare_options(ceph_opts, client_node->client)) {
780 __rbd_get_client(client_node);
781
782 found = true;
783 break;
784 }
785 }
786 spin_unlock(&rbd_client_list_lock);
787
788 return found ? client_node : NULL;
789 }
790
791 /*
792 * (Per device) rbd map options
793 */
794 enum {
795 Opt_queue_depth,
796 Opt_last_int,
797 /* int args above */
798 Opt_last_string,
799 /* string args above */
800 Opt_read_only,
801 Opt_read_write,
802 Opt_lock_on_read,
803 Opt_exclusive,
804 Opt_err
805 };
806
807 static match_table_t rbd_opts_tokens = {
808 {Opt_queue_depth, "queue_depth=%d"},
809 /* int args above */
810 /* string args above */
811 {Opt_read_only, "read_only"},
812 {Opt_read_only, "ro"}, /* Alternate spelling */
813 {Opt_read_write, "read_write"},
814 {Opt_read_write, "rw"}, /* Alternate spelling */
815 {Opt_lock_on_read, "lock_on_read"},
816 {Opt_exclusive, "exclusive"},
817 {Opt_err, NULL}
818 };
819
820 struct rbd_options {
821 int queue_depth;
822 bool read_only;
823 bool lock_on_read;
824 bool exclusive;
825 };
826
827 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
828 #define RBD_READ_ONLY_DEFAULT false
829 #define RBD_LOCK_ON_READ_DEFAULT false
830 #define RBD_EXCLUSIVE_DEFAULT false
831
832 static int parse_rbd_opts_token(char *c, void *private)
833 {
834 struct rbd_options *rbd_opts = private;
835 substring_t argstr[MAX_OPT_ARGS];
836 int token, intval, ret;
837
838 token = match_token(c, rbd_opts_tokens, argstr);
839 if (token < Opt_last_int) {
840 ret = match_int(&argstr[0], &intval);
841 if (ret < 0) {
842 pr_err("bad mount option arg (not int) at '%s'\n", c);
843 return ret;
844 }
845 dout("got int token %d val %d\n", token, intval);
846 } else if (token > Opt_last_int && token < Opt_last_string) {
847 dout("got string token %d val %s\n", token, argstr[0].from);
848 } else {
849 dout("got token %d\n", token);
850 }
851
852 switch (token) {
853 case Opt_queue_depth:
854 if (intval < 1) {
855 pr_err("queue_depth out of range\n");
856 return -EINVAL;
857 }
858 rbd_opts->queue_depth = intval;
859 break;
860 case Opt_read_only:
861 rbd_opts->read_only = true;
862 break;
863 case Opt_read_write:
864 rbd_opts->read_only = false;
865 break;
866 case Opt_lock_on_read:
867 rbd_opts->lock_on_read = true;
868 break;
869 case Opt_exclusive:
870 rbd_opts->exclusive = true;
871 break;
872 default:
873 /* libceph prints "bad option" msg */
874 return -EINVAL;
875 }
876
877 return 0;
878 }
879
880 static char* obj_op_name(enum obj_operation_type op_type)
881 {
882 switch (op_type) {
883 case OBJ_OP_READ:
884 return "read";
885 case OBJ_OP_WRITE:
886 return "write";
887 case OBJ_OP_DISCARD:
888 return "discard";
889 default:
890 return "???";
891 }
892 }
893
894 /*
895 * Get a ceph client with specific addr and configuration, if one does
896 * not exist create it. Either way, ceph_opts is consumed by this
897 * function.
898 */
899 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
900 {
901 struct rbd_client *rbdc;
902
903 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
904 rbdc = rbd_client_find(ceph_opts);
905 if (rbdc) /* using an existing client */
906 ceph_destroy_options(ceph_opts);
907 else
908 rbdc = rbd_client_create(ceph_opts);
909 mutex_unlock(&client_mutex);
910
911 return rbdc;
912 }
913
914 /*
915 * Destroy ceph client
916 *
917 * Caller must hold rbd_client_list_lock.
918 */
919 static void rbd_client_release(struct kref *kref)
920 {
921 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
922
923 dout("%s: rbdc %p\n", __func__, rbdc);
924 spin_lock(&rbd_client_list_lock);
925 list_del(&rbdc->node);
926 spin_unlock(&rbd_client_list_lock);
927
928 ceph_destroy_client(rbdc->client);
929 kfree(rbdc);
930 }
931
932 /*
933 * Drop reference to ceph client node. If it's not referenced anymore, release
934 * it.
935 */
936 static void rbd_put_client(struct rbd_client *rbdc)
937 {
938 if (rbdc)
939 kref_put(&rbdc->kref, rbd_client_release);
940 }
941
942 static bool rbd_image_format_valid(u32 image_format)
943 {
944 return image_format == 1 || image_format == 2;
945 }
946
947 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
948 {
949 size_t size;
950 u32 snap_count;
951
952 /* The header has to start with the magic rbd header text */
953 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
954 return false;
955
956 /* The bio layer requires at least sector-sized I/O */
957
958 if (ondisk->options.order < SECTOR_SHIFT)
959 return false;
960
961 /* If we use u64 in a few spots we may be able to loosen this */
962
963 if (ondisk->options.order > 8 * sizeof (int) - 1)
964 return false;
965
966 /*
967 * The size of a snapshot header has to fit in a size_t, and
968 * that limits the number of snapshots.
969 */
970 snap_count = le32_to_cpu(ondisk->snap_count);
971 size = SIZE_MAX - sizeof (struct ceph_snap_context);
972 if (snap_count > size / sizeof (__le64))
973 return false;
974
975 /*
976 * Not only that, but the size of the entire the snapshot
977 * header must also be representable in a size_t.
978 */
979 size -= snap_count * sizeof (__le64);
980 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
981 return false;
982
983 return true;
984 }
985
986 /*
987 * returns the size of an object in the image
988 */
989 static u32 rbd_obj_bytes(struct rbd_image_header *header)
990 {
991 return 1U << header->obj_order;
992 }
993
994 static void rbd_init_layout(struct rbd_device *rbd_dev)
995 {
996 if (rbd_dev->header.stripe_unit == 0 ||
997 rbd_dev->header.stripe_count == 0) {
998 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
999 rbd_dev->header.stripe_count = 1;
1000 }
1001
1002 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1003 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1004 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1005 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1006 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1007 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1008 }
1009
1010 /*
1011 * Fill an rbd image header with information from the given format 1
1012 * on-disk header.
1013 */
1014 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1015 struct rbd_image_header_ondisk *ondisk)
1016 {
1017 struct rbd_image_header *header = &rbd_dev->header;
1018 bool first_time = header->object_prefix == NULL;
1019 struct ceph_snap_context *snapc;
1020 char *object_prefix = NULL;
1021 char *snap_names = NULL;
1022 u64 *snap_sizes = NULL;
1023 u32 snap_count;
1024 int ret = -ENOMEM;
1025 u32 i;
1026
1027 /* Allocate this now to avoid having to handle failure below */
1028
1029 if (first_time) {
1030 object_prefix = kstrndup(ondisk->object_prefix,
1031 sizeof(ondisk->object_prefix),
1032 GFP_KERNEL);
1033 if (!object_prefix)
1034 return -ENOMEM;
1035 }
1036
1037 /* Allocate the snapshot context and fill it in */
1038
1039 snap_count = le32_to_cpu(ondisk->snap_count);
1040 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1041 if (!snapc)
1042 goto out_err;
1043 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1044 if (snap_count) {
1045 struct rbd_image_snap_ondisk *snaps;
1046 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1047
1048 /* We'll keep a copy of the snapshot names... */
1049
1050 if (snap_names_len > (u64)SIZE_MAX)
1051 goto out_2big;
1052 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1053 if (!snap_names)
1054 goto out_err;
1055
1056 /* ...as well as the array of their sizes. */
1057 snap_sizes = kmalloc_array(snap_count,
1058 sizeof(*header->snap_sizes),
1059 GFP_KERNEL);
1060 if (!snap_sizes)
1061 goto out_err;
1062
1063 /*
1064 * Copy the names, and fill in each snapshot's id
1065 * and size.
1066 *
1067 * Note that rbd_dev_v1_header_info() guarantees the
1068 * ondisk buffer we're working with has
1069 * snap_names_len bytes beyond the end of the
1070 * snapshot id array, this memcpy() is safe.
1071 */
1072 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1073 snaps = ondisk->snaps;
1074 for (i = 0; i < snap_count; i++) {
1075 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1076 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1077 }
1078 }
1079
1080 /* We won't fail any more, fill in the header */
1081
1082 if (first_time) {
1083 header->object_prefix = object_prefix;
1084 header->obj_order = ondisk->options.order;
1085 rbd_init_layout(rbd_dev);
1086 } else {
1087 ceph_put_snap_context(header->snapc);
1088 kfree(header->snap_names);
1089 kfree(header->snap_sizes);
1090 }
1091
1092 /* The remaining fields always get updated (when we refresh) */
1093
1094 header->image_size = le64_to_cpu(ondisk->image_size);
1095 header->snapc = snapc;
1096 header->snap_names = snap_names;
1097 header->snap_sizes = snap_sizes;
1098
1099 return 0;
1100 out_2big:
1101 ret = -EIO;
1102 out_err:
1103 kfree(snap_sizes);
1104 kfree(snap_names);
1105 ceph_put_snap_context(snapc);
1106 kfree(object_prefix);
1107
1108 return ret;
1109 }
1110
1111 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1112 {
1113 const char *snap_name;
1114
1115 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1116
1117 /* Skip over names until we find the one we are looking for */
1118
1119 snap_name = rbd_dev->header.snap_names;
1120 while (which--)
1121 snap_name += strlen(snap_name) + 1;
1122
1123 return kstrdup(snap_name, GFP_KERNEL);
1124 }
1125
1126 /*
1127 * Snapshot id comparison function for use with qsort()/bsearch().
1128 * Note that result is for snapshots in *descending* order.
1129 */
1130 static int snapid_compare_reverse(const void *s1, const void *s2)
1131 {
1132 u64 snap_id1 = *(u64 *)s1;
1133 u64 snap_id2 = *(u64 *)s2;
1134
1135 if (snap_id1 < snap_id2)
1136 return 1;
1137 return snap_id1 == snap_id2 ? 0 : -1;
1138 }
1139
1140 /*
1141 * Search a snapshot context to see if the given snapshot id is
1142 * present.
1143 *
1144 * Returns the position of the snapshot id in the array if it's found,
1145 * or BAD_SNAP_INDEX otherwise.
1146 *
1147 * Note: The snapshot array is in kept sorted (by the osd) in
1148 * reverse order, highest snapshot id first.
1149 */
1150 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1151 {
1152 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1153 u64 *found;
1154
1155 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1156 sizeof (snap_id), snapid_compare_reverse);
1157
1158 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1159 }
1160
1161 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1162 u64 snap_id)
1163 {
1164 u32 which;
1165 const char *snap_name;
1166
1167 which = rbd_dev_snap_index(rbd_dev, snap_id);
1168 if (which == BAD_SNAP_INDEX)
1169 return ERR_PTR(-ENOENT);
1170
1171 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1172 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1173 }
1174
1175 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1176 {
1177 if (snap_id == CEPH_NOSNAP)
1178 return RBD_SNAP_HEAD_NAME;
1179
1180 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181 if (rbd_dev->image_format == 1)
1182 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1183
1184 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1185 }
1186
1187 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1188 u64 *snap_size)
1189 {
1190 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191 if (snap_id == CEPH_NOSNAP) {
1192 *snap_size = rbd_dev->header.image_size;
1193 } else if (rbd_dev->image_format == 1) {
1194 u32 which;
1195
1196 which = rbd_dev_snap_index(rbd_dev, snap_id);
1197 if (which == BAD_SNAP_INDEX)
1198 return -ENOENT;
1199
1200 *snap_size = rbd_dev->header.snap_sizes[which];
1201 } else {
1202 u64 size = 0;
1203 int ret;
1204
1205 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1206 if (ret)
1207 return ret;
1208
1209 *snap_size = size;
1210 }
1211 return 0;
1212 }
1213
1214 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1215 u64 *snap_features)
1216 {
1217 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1218 if (snap_id == CEPH_NOSNAP) {
1219 *snap_features = rbd_dev->header.features;
1220 } else if (rbd_dev->image_format == 1) {
1221 *snap_features = 0; /* No features for format 1 */
1222 } else {
1223 u64 features = 0;
1224 int ret;
1225
1226 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1227 if (ret)
1228 return ret;
1229
1230 *snap_features = features;
1231 }
1232 return 0;
1233 }
1234
1235 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1236 {
1237 u64 snap_id = rbd_dev->spec->snap_id;
1238 u64 size = 0;
1239 u64 features = 0;
1240 int ret;
1241
1242 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1243 if (ret)
1244 return ret;
1245 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1246 if (ret)
1247 return ret;
1248
1249 rbd_dev->mapping.size = size;
1250 rbd_dev->mapping.features = features;
1251
1252 return 0;
1253 }
1254
1255 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1256 {
1257 rbd_dev->mapping.size = 0;
1258 rbd_dev->mapping.features = 0;
1259 }
1260
1261 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1262 {
1263 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1264
1265 return offset & (segment_size - 1);
1266 }
1267
1268 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269 u64 offset, u64 length)
1270 {
1271 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1272
1273 offset &= segment_size - 1;
1274
1275 rbd_assert(length <= U64_MAX - offset);
1276 if (offset + length > segment_size)
1277 length = segment_size - offset;
1278
1279 return length;
1280 }
1281
1282 /*
1283 * bio helpers
1284 */
1285
1286 static void bio_chain_put(struct bio *chain)
1287 {
1288 struct bio *tmp;
1289
1290 while (chain) {
1291 tmp = chain;
1292 chain = chain->bi_next;
1293 bio_put(tmp);
1294 }
1295 }
1296
1297 /*
1298 * zeros a bio chain, starting at specific offset
1299 */
1300 static void zero_bio_chain(struct bio *chain, int start_ofs)
1301 {
1302 struct bio_vec bv;
1303 struct bvec_iter iter;
1304 unsigned long flags;
1305 void *buf;
1306 int pos = 0;
1307
1308 while (chain) {
1309 bio_for_each_segment(bv, chain, iter) {
1310 if (pos + bv.bv_len > start_ofs) {
1311 int remainder = max(start_ofs - pos, 0);
1312 buf = bvec_kmap_irq(&bv, &flags);
1313 memset(buf + remainder, 0,
1314 bv.bv_len - remainder);
1315 flush_dcache_page(bv.bv_page);
1316 bvec_kunmap_irq(buf, &flags);
1317 }
1318 pos += bv.bv_len;
1319 }
1320
1321 chain = chain->bi_next;
1322 }
1323 }
1324
1325 /*
1326 * similar to zero_bio_chain(), zeros data defined by a page array,
1327 * starting at the given byte offset from the start of the array and
1328 * continuing up to the given end offset. The pages array is
1329 * assumed to be big enough to hold all bytes up to the end.
1330 */
1331 static void zero_pages(struct page **pages, u64 offset, u64 end)
1332 {
1333 struct page **page = &pages[offset >> PAGE_SHIFT];
1334
1335 rbd_assert(end > offset);
1336 rbd_assert(end - offset <= (u64)SIZE_MAX);
1337 while (offset < end) {
1338 size_t page_offset;
1339 size_t length;
1340 unsigned long flags;
1341 void *kaddr;
1342
1343 page_offset = offset & ~PAGE_MASK;
1344 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345 local_irq_save(flags);
1346 kaddr = kmap_atomic(*page);
1347 memset(kaddr + page_offset, 0, length);
1348 flush_dcache_page(*page);
1349 kunmap_atomic(kaddr);
1350 local_irq_restore(flags);
1351
1352 offset += length;
1353 page++;
1354 }
1355 }
1356
1357 /*
1358 * Clone a portion of a bio, starting at the given byte offset
1359 * and continuing for the number of bytes indicated.
1360 */
1361 static struct bio *bio_clone_range(struct bio *bio_src,
1362 unsigned int offset,
1363 unsigned int len,
1364 gfp_t gfpmask)
1365 {
1366 struct bio *bio;
1367
1368 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1369 if (!bio)
1370 return NULL; /* ENOMEM */
1371
1372 bio_advance(bio, offset);
1373 bio->bi_iter.bi_size = len;
1374
1375 return bio;
1376 }
1377
1378 /*
1379 * Clone a portion of a bio chain, starting at the given byte offset
1380 * into the first bio in the source chain and continuing for the
1381 * number of bytes indicated. The result is another bio chain of
1382 * exactly the given length, or a null pointer on error.
1383 *
1384 * The bio_src and offset parameters are both in-out. On entry they
1385 * refer to the first source bio and the offset into that bio where
1386 * the start of data to be cloned is located.
1387 *
1388 * On return, bio_src is updated to refer to the bio in the source
1389 * chain that contains first un-cloned byte, and *offset will
1390 * contain the offset of that byte within that bio.
1391 */
1392 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1393 unsigned int *offset,
1394 unsigned int len,
1395 gfp_t gfpmask)
1396 {
1397 struct bio *bi = *bio_src;
1398 unsigned int off = *offset;
1399 struct bio *chain = NULL;
1400 struct bio **end;
1401
1402 /* Build up a chain of clone bios up to the limit */
1403
1404 if (!bi || off >= bi->bi_iter.bi_size || !len)
1405 return NULL; /* Nothing to clone */
1406
1407 end = &chain;
1408 while (len) {
1409 unsigned int bi_size;
1410 struct bio *bio;
1411
1412 if (!bi) {
1413 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1414 goto out_err; /* EINVAL; ran out of bio's */
1415 }
1416 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1417 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1418 if (!bio)
1419 goto out_err; /* ENOMEM */
1420
1421 *end = bio;
1422 end = &bio->bi_next;
1423
1424 off += bi_size;
1425 if (off == bi->bi_iter.bi_size) {
1426 bi = bi->bi_next;
1427 off = 0;
1428 }
1429 len -= bi_size;
1430 }
1431 *bio_src = bi;
1432 *offset = off;
1433
1434 return chain;
1435 out_err:
1436 bio_chain_put(chain);
1437
1438 return NULL;
1439 }
1440
1441 /*
1442 * The default/initial value for all object request flags is 0. For
1443 * each flag, once its value is set to 1 it is never reset to 0
1444 * again.
1445 */
1446 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1447 {
1448 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1449 struct rbd_device *rbd_dev;
1450
1451 rbd_dev = obj_request->img_request->rbd_dev;
1452 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1453 obj_request);
1454 }
1455 }
1456
1457 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1458 {
1459 smp_mb();
1460 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1461 }
1462
1463 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1464 {
1465 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1466 struct rbd_device *rbd_dev = NULL;
1467
1468 if (obj_request_img_data_test(obj_request))
1469 rbd_dev = obj_request->img_request->rbd_dev;
1470 rbd_warn(rbd_dev, "obj_request %p already marked done",
1471 obj_request);
1472 }
1473 }
1474
1475 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1476 {
1477 smp_mb();
1478 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1479 }
1480
1481 /*
1482 * This sets the KNOWN flag after (possibly) setting the EXISTS
1483 * flag. The latter is set based on the "exists" value provided.
1484 *
1485 * Note that for our purposes once an object exists it never goes
1486 * away again. It's possible that the response from two existence
1487 * checks are separated by the creation of the target object, and
1488 * the first ("doesn't exist") response arrives *after* the second
1489 * ("does exist"). In that case we ignore the second one.
1490 */
1491 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1492 bool exists)
1493 {
1494 if (exists)
1495 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1496 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1497 smp_mb();
1498 }
1499
1500 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1501 {
1502 smp_mb();
1503 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1504 }
1505
1506 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1507 {
1508 smp_mb();
1509 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1510 }
1511
1512 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1513 {
1514 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1515
1516 return obj_request->img_offset <
1517 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1518 }
1519
1520 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1521 {
1522 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1523 kref_read(&obj_request->kref));
1524 kref_get(&obj_request->kref);
1525 }
1526
1527 static void rbd_obj_request_destroy(struct kref *kref);
1528 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1529 {
1530 rbd_assert(obj_request != NULL);
1531 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1532 kref_read(&obj_request->kref));
1533 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1534 }
1535
1536 static void rbd_img_request_get(struct rbd_img_request *img_request)
1537 {
1538 dout("%s: img %p (was %d)\n", __func__, img_request,
1539 kref_read(&img_request->kref));
1540 kref_get(&img_request->kref);
1541 }
1542
1543 static bool img_request_child_test(struct rbd_img_request *img_request);
1544 static void rbd_parent_request_destroy(struct kref *kref);
1545 static void rbd_img_request_destroy(struct kref *kref);
1546 static void rbd_img_request_put(struct rbd_img_request *img_request)
1547 {
1548 rbd_assert(img_request != NULL);
1549 dout("%s: img %p (was %d)\n", __func__, img_request,
1550 kref_read(&img_request->kref));
1551 if (img_request_child_test(img_request))
1552 kref_put(&img_request->kref, rbd_parent_request_destroy);
1553 else
1554 kref_put(&img_request->kref, rbd_img_request_destroy);
1555 }
1556
1557 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1558 struct rbd_obj_request *obj_request)
1559 {
1560 rbd_assert(obj_request->img_request == NULL);
1561
1562 /* Image request now owns object's original reference */
1563 obj_request->img_request = img_request;
1564 obj_request->which = img_request->obj_request_count;
1565 rbd_assert(!obj_request_img_data_test(obj_request));
1566 obj_request_img_data_set(obj_request);
1567 rbd_assert(obj_request->which != BAD_WHICH);
1568 img_request->obj_request_count++;
1569 list_add_tail(&obj_request->links, &img_request->obj_requests);
1570 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1571 obj_request->which);
1572 }
1573
1574 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1575 struct rbd_obj_request *obj_request)
1576 {
1577 rbd_assert(obj_request->which != BAD_WHICH);
1578
1579 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1580 obj_request->which);
1581 list_del(&obj_request->links);
1582 rbd_assert(img_request->obj_request_count > 0);
1583 img_request->obj_request_count--;
1584 rbd_assert(obj_request->which == img_request->obj_request_count);
1585 obj_request->which = BAD_WHICH;
1586 rbd_assert(obj_request_img_data_test(obj_request));
1587 rbd_assert(obj_request->img_request == img_request);
1588 obj_request->img_request = NULL;
1589 obj_request->callback = NULL;
1590 rbd_obj_request_put(obj_request);
1591 }
1592
1593 static bool obj_request_type_valid(enum obj_request_type type)
1594 {
1595 switch (type) {
1596 case OBJ_REQUEST_NODATA:
1597 case OBJ_REQUEST_BIO:
1598 case OBJ_REQUEST_PAGES:
1599 return true;
1600 default:
1601 return false;
1602 }
1603 }
1604
1605 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1606
1607 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1608 {
1609 struct ceph_osd_request *osd_req = obj_request->osd_req;
1610
1611 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1612 obj_request, obj_request->object_no, obj_request->offset,
1613 obj_request->length, osd_req);
1614 if (obj_request_img_data_test(obj_request)) {
1615 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1616 rbd_img_request_get(obj_request->img_request);
1617 }
1618 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1619 }
1620
1621 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1622 {
1623
1624 dout("%s: img %p\n", __func__, img_request);
1625
1626 /*
1627 * If no error occurred, compute the aggregate transfer
1628 * count for the image request. We could instead use
1629 * atomic64_cmpxchg() to update it as each object request
1630 * completes; not clear which way is better off hand.
1631 */
1632 if (!img_request->result) {
1633 struct rbd_obj_request *obj_request;
1634 u64 xferred = 0;
1635
1636 for_each_obj_request(img_request, obj_request)
1637 xferred += obj_request->xferred;
1638 img_request->xferred = xferred;
1639 }
1640
1641 if (img_request->callback)
1642 img_request->callback(img_request);
1643 else
1644 rbd_img_request_put(img_request);
1645 }
1646
1647 /*
1648 * The default/initial value for all image request flags is 0. Each
1649 * is conditionally set to 1 at image request initialization time
1650 * and currently never change thereafter.
1651 */
1652 static void img_request_write_set(struct rbd_img_request *img_request)
1653 {
1654 set_bit(IMG_REQ_WRITE, &img_request->flags);
1655 smp_mb();
1656 }
1657
1658 static bool img_request_write_test(struct rbd_img_request *img_request)
1659 {
1660 smp_mb();
1661 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1662 }
1663
1664 /*
1665 * Set the discard flag when the img_request is an discard request
1666 */
1667 static void img_request_discard_set(struct rbd_img_request *img_request)
1668 {
1669 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1670 smp_mb();
1671 }
1672
1673 static bool img_request_discard_test(struct rbd_img_request *img_request)
1674 {
1675 smp_mb();
1676 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1677 }
1678
1679 static void img_request_child_set(struct rbd_img_request *img_request)
1680 {
1681 set_bit(IMG_REQ_CHILD, &img_request->flags);
1682 smp_mb();
1683 }
1684
1685 static void img_request_child_clear(struct rbd_img_request *img_request)
1686 {
1687 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1688 smp_mb();
1689 }
1690
1691 static bool img_request_child_test(struct rbd_img_request *img_request)
1692 {
1693 smp_mb();
1694 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1695 }
1696
1697 static void img_request_layered_set(struct rbd_img_request *img_request)
1698 {
1699 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1700 smp_mb();
1701 }
1702
1703 static void img_request_layered_clear(struct rbd_img_request *img_request)
1704 {
1705 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1706 smp_mb();
1707 }
1708
1709 static bool img_request_layered_test(struct rbd_img_request *img_request)
1710 {
1711 smp_mb();
1712 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1713 }
1714
1715 static enum obj_operation_type
1716 rbd_img_request_op_type(struct rbd_img_request *img_request)
1717 {
1718 if (img_request_write_test(img_request))
1719 return OBJ_OP_WRITE;
1720 else if (img_request_discard_test(img_request))
1721 return OBJ_OP_DISCARD;
1722 else
1723 return OBJ_OP_READ;
1724 }
1725
1726 static void
1727 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1728 {
1729 u64 xferred = obj_request->xferred;
1730 u64 length = obj_request->length;
1731
1732 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1733 obj_request, obj_request->img_request, obj_request->result,
1734 xferred, length);
1735 /*
1736 * ENOENT means a hole in the image. We zero-fill the entire
1737 * length of the request. A short read also implies zero-fill
1738 * to the end of the request. An error requires the whole
1739 * length of the request to be reported finished with an error
1740 * to the block layer. In each case we update the xferred
1741 * count to indicate the whole request was satisfied.
1742 */
1743 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1744 if (obj_request->result == -ENOENT) {
1745 if (obj_request->type == OBJ_REQUEST_BIO)
1746 zero_bio_chain(obj_request->bio_list, 0);
1747 else
1748 zero_pages(obj_request->pages, 0, length);
1749 obj_request->result = 0;
1750 } else if (xferred < length && !obj_request->result) {
1751 if (obj_request->type == OBJ_REQUEST_BIO)
1752 zero_bio_chain(obj_request->bio_list, xferred);
1753 else
1754 zero_pages(obj_request->pages, xferred, length);
1755 }
1756 obj_request->xferred = length;
1757 obj_request_done_set(obj_request);
1758 }
1759
1760 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1761 {
1762 dout("%s: obj %p cb %p\n", __func__, obj_request,
1763 obj_request->callback);
1764 if (obj_request->callback)
1765 obj_request->callback(obj_request);
1766 else
1767 complete_all(&obj_request->completion);
1768 }
1769
1770 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1771 {
1772 obj_request->result = err;
1773 obj_request->xferred = 0;
1774 /*
1775 * kludge - mirror rbd_obj_request_submit() to match a put in
1776 * rbd_img_obj_callback()
1777 */
1778 if (obj_request_img_data_test(obj_request)) {
1779 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1780 rbd_img_request_get(obj_request->img_request);
1781 }
1782 obj_request_done_set(obj_request);
1783 rbd_obj_request_complete(obj_request);
1784 }
1785
1786 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1787 {
1788 struct rbd_img_request *img_request = NULL;
1789 struct rbd_device *rbd_dev = NULL;
1790 bool layered = false;
1791
1792 if (obj_request_img_data_test(obj_request)) {
1793 img_request = obj_request->img_request;
1794 layered = img_request && img_request_layered_test(img_request);
1795 rbd_dev = img_request->rbd_dev;
1796 }
1797
1798 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1799 obj_request, img_request, obj_request->result,
1800 obj_request->xferred, obj_request->length);
1801 if (layered && obj_request->result == -ENOENT &&
1802 obj_request->img_offset < rbd_dev->parent_overlap)
1803 rbd_img_parent_read(obj_request);
1804 else if (img_request)
1805 rbd_img_obj_request_read_callback(obj_request);
1806 else
1807 obj_request_done_set(obj_request);
1808 }
1809
1810 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1811 {
1812 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1813 obj_request->result, obj_request->length);
1814 /*
1815 * There is no such thing as a successful short write. Set
1816 * it to our originally-requested length.
1817 */
1818 obj_request->xferred = obj_request->length;
1819 obj_request_done_set(obj_request);
1820 }
1821
1822 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1823 {
1824 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1825 obj_request->result, obj_request->length);
1826 /*
1827 * There is no such thing as a successful short discard. Set
1828 * it to our originally-requested length.
1829 */
1830 obj_request->xferred = obj_request->length;
1831 /* discarding a non-existent object is not a problem */
1832 if (obj_request->result == -ENOENT)
1833 obj_request->result = 0;
1834 obj_request_done_set(obj_request);
1835 }
1836
1837 /*
1838 * For a simple stat call there's nothing to do. We'll do more if
1839 * this is part of a write sequence for a layered image.
1840 */
1841 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1842 {
1843 dout("%s: obj %p\n", __func__, obj_request);
1844 obj_request_done_set(obj_request);
1845 }
1846
1847 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1848 {
1849 dout("%s: obj %p\n", __func__, obj_request);
1850
1851 if (obj_request_img_data_test(obj_request))
1852 rbd_osd_copyup_callback(obj_request);
1853 else
1854 obj_request_done_set(obj_request);
1855 }
1856
1857 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1858 {
1859 struct rbd_obj_request *obj_request = osd_req->r_priv;
1860 u16 opcode;
1861
1862 dout("%s: osd_req %p\n", __func__, osd_req);
1863 rbd_assert(osd_req == obj_request->osd_req);
1864 if (obj_request_img_data_test(obj_request)) {
1865 rbd_assert(obj_request->img_request);
1866 rbd_assert(obj_request->which != BAD_WHICH);
1867 } else {
1868 rbd_assert(obj_request->which == BAD_WHICH);
1869 }
1870
1871 if (osd_req->r_result < 0)
1872 obj_request->result = osd_req->r_result;
1873
1874 /*
1875 * We support a 64-bit length, but ultimately it has to be
1876 * passed to the block layer, which just supports a 32-bit
1877 * length field.
1878 */
1879 obj_request->xferred = osd_req->r_ops[0].outdata_len;
1880 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1881
1882 opcode = osd_req->r_ops[0].op;
1883 switch (opcode) {
1884 case CEPH_OSD_OP_READ:
1885 rbd_osd_read_callback(obj_request);
1886 break;
1887 case CEPH_OSD_OP_SETALLOCHINT:
1888 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1889 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1890 /* fall through */
1891 case CEPH_OSD_OP_WRITE:
1892 case CEPH_OSD_OP_WRITEFULL:
1893 rbd_osd_write_callback(obj_request);
1894 break;
1895 case CEPH_OSD_OP_STAT:
1896 rbd_osd_stat_callback(obj_request);
1897 break;
1898 case CEPH_OSD_OP_DELETE:
1899 case CEPH_OSD_OP_TRUNCATE:
1900 case CEPH_OSD_OP_ZERO:
1901 rbd_osd_discard_callback(obj_request);
1902 break;
1903 case CEPH_OSD_OP_CALL:
1904 rbd_osd_call_callback(obj_request);
1905 break;
1906 default:
1907 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1908 obj_request->object_no, opcode);
1909 break;
1910 }
1911
1912 if (obj_request_done_test(obj_request))
1913 rbd_obj_request_complete(obj_request);
1914 }
1915
1916 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1917 {
1918 struct ceph_osd_request *osd_req = obj_request->osd_req;
1919
1920 rbd_assert(obj_request_img_data_test(obj_request));
1921 osd_req->r_snapid = obj_request->img_request->snap_id;
1922 }
1923
1924 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1925 {
1926 struct ceph_osd_request *osd_req = obj_request->osd_req;
1927
1928 ktime_get_real_ts(&osd_req->r_mtime);
1929 osd_req->r_data_offset = obj_request->offset;
1930 }
1931
1932 static struct ceph_osd_request *
1933 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1934 struct ceph_snap_context *snapc,
1935 int num_ops, unsigned int flags,
1936 struct rbd_obj_request *obj_request)
1937 {
1938 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1939 struct ceph_osd_request *req;
1940 const char *name_format = rbd_dev->image_format == 1 ?
1941 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1942
1943 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1944 if (!req)
1945 return NULL;
1946
1947 req->r_flags = flags;
1948 req->r_callback = rbd_osd_req_callback;
1949 req->r_priv = obj_request;
1950
1951 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1952 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1953 rbd_dev->header.object_prefix, obj_request->object_no))
1954 goto err_req;
1955
1956 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1957 goto err_req;
1958
1959 return req;
1960
1961 err_req:
1962 ceph_osdc_put_request(req);
1963 return NULL;
1964 }
1965
1966 /*
1967 * Create an osd request. A read request has one osd op (read).
1968 * A write request has either one (watch) or two (hint+write) osd ops.
1969 * (All rbd data writes are prefixed with an allocation hint op, but
1970 * technically osd watch is a write request, hence this distinction.)
1971 */
1972 static struct ceph_osd_request *rbd_osd_req_create(
1973 struct rbd_device *rbd_dev,
1974 enum obj_operation_type op_type,
1975 unsigned int num_ops,
1976 struct rbd_obj_request *obj_request)
1977 {
1978 struct ceph_snap_context *snapc = NULL;
1979
1980 if (obj_request_img_data_test(obj_request) &&
1981 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1982 struct rbd_img_request *img_request = obj_request->img_request;
1983 if (op_type == OBJ_OP_WRITE) {
1984 rbd_assert(img_request_write_test(img_request));
1985 } else {
1986 rbd_assert(img_request_discard_test(img_request));
1987 }
1988 snapc = img_request->snapc;
1989 }
1990
1991 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1992
1993 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1994 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1995 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1996 }
1997
1998 /*
1999 * Create a copyup osd request based on the information in the object
2000 * request supplied. A copyup request has two or three osd ops, a
2001 * copyup method call, potentially a hint op, and a write or truncate
2002 * or zero op.
2003 */
2004 static struct ceph_osd_request *
2005 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2006 {
2007 struct rbd_img_request *img_request;
2008 int num_osd_ops = 3;
2009
2010 rbd_assert(obj_request_img_data_test(obj_request));
2011 img_request = obj_request->img_request;
2012 rbd_assert(img_request);
2013 rbd_assert(img_request_write_test(img_request) ||
2014 img_request_discard_test(img_request));
2015
2016 if (img_request_discard_test(img_request))
2017 num_osd_ops = 2;
2018
2019 return __rbd_osd_req_create(img_request->rbd_dev,
2020 img_request->snapc, num_osd_ops,
2021 CEPH_OSD_FLAG_WRITE, obj_request);
2022 }
2023
2024 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2025 {
2026 ceph_osdc_put_request(osd_req);
2027 }
2028
2029 static struct rbd_obj_request *
2030 rbd_obj_request_create(enum obj_request_type type)
2031 {
2032 struct rbd_obj_request *obj_request;
2033
2034 rbd_assert(obj_request_type_valid(type));
2035
2036 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2037 if (!obj_request)
2038 return NULL;
2039
2040 obj_request->which = BAD_WHICH;
2041 obj_request->type = type;
2042 INIT_LIST_HEAD(&obj_request->links);
2043 init_completion(&obj_request->completion);
2044 kref_init(&obj_request->kref);
2045
2046 dout("%s %p\n", __func__, obj_request);
2047 return obj_request;
2048 }
2049
2050 static void rbd_obj_request_destroy(struct kref *kref)
2051 {
2052 struct rbd_obj_request *obj_request;
2053
2054 obj_request = container_of(kref, struct rbd_obj_request, kref);
2055
2056 dout("%s: obj %p\n", __func__, obj_request);
2057
2058 rbd_assert(obj_request->img_request == NULL);
2059 rbd_assert(obj_request->which == BAD_WHICH);
2060
2061 if (obj_request->osd_req)
2062 rbd_osd_req_destroy(obj_request->osd_req);
2063
2064 rbd_assert(obj_request_type_valid(obj_request->type));
2065 switch (obj_request->type) {
2066 case OBJ_REQUEST_NODATA:
2067 break; /* Nothing to do */
2068 case OBJ_REQUEST_BIO:
2069 if (obj_request->bio_list)
2070 bio_chain_put(obj_request->bio_list);
2071 break;
2072 case OBJ_REQUEST_PAGES:
2073 /* img_data requests don't own their page array */
2074 if (obj_request->pages &&
2075 !obj_request_img_data_test(obj_request))
2076 ceph_release_page_vector(obj_request->pages,
2077 obj_request->page_count);
2078 break;
2079 }
2080
2081 kmem_cache_free(rbd_obj_request_cache, obj_request);
2082 }
2083
2084 /* It's OK to call this for a device with no parent */
2085
2086 static void rbd_spec_put(struct rbd_spec *spec);
2087 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2088 {
2089 rbd_dev_remove_parent(rbd_dev);
2090 rbd_spec_put(rbd_dev->parent_spec);
2091 rbd_dev->parent_spec = NULL;
2092 rbd_dev->parent_overlap = 0;
2093 }
2094
2095 /*
2096 * Parent image reference counting is used to determine when an
2097 * image's parent fields can be safely torn down--after there are no
2098 * more in-flight requests to the parent image. When the last
2099 * reference is dropped, cleaning them up is safe.
2100 */
2101 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2102 {
2103 int counter;
2104
2105 if (!rbd_dev->parent_spec)
2106 return;
2107
2108 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2109 if (counter > 0)
2110 return;
2111
2112 /* Last reference; clean up parent data structures */
2113
2114 if (!counter)
2115 rbd_dev_unparent(rbd_dev);
2116 else
2117 rbd_warn(rbd_dev, "parent reference underflow");
2118 }
2119
2120 /*
2121 * If an image has a non-zero parent overlap, get a reference to its
2122 * parent.
2123 *
2124 * Returns true if the rbd device has a parent with a non-zero
2125 * overlap and a reference for it was successfully taken, or
2126 * false otherwise.
2127 */
2128 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2129 {
2130 int counter = 0;
2131
2132 if (!rbd_dev->parent_spec)
2133 return false;
2134
2135 down_read(&rbd_dev->header_rwsem);
2136 if (rbd_dev->parent_overlap)
2137 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2138 up_read(&rbd_dev->header_rwsem);
2139
2140 if (counter < 0)
2141 rbd_warn(rbd_dev, "parent reference overflow");
2142
2143 return counter > 0;
2144 }
2145
2146 /*
2147 * Caller is responsible for filling in the list of object requests
2148 * that comprises the image request, and the Linux request pointer
2149 * (if there is one).
2150 */
2151 static struct rbd_img_request *rbd_img_request_create(
2152 struct rbd_device *rbd_dev,
2153 u64 offset, u64 length,
2154 enum obj_operation_type op_type,
2155 struct ceph_snap_context *snapc)
2156 {
2157 struct rbd_img_request *img_request;
2158
2159 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2160 if (!img_request)
2161 return NULL;
2162
2163 img_request->rq = NULL;
2164 img_request->rbd_dev = rbd_dev;
2165 img_request->offset = offset;
2166 img_request->length = length;
2167 img_request->flags = 0;
2168 if (op_type == OBJ_OP_DISCARD) {
2169 img_request_discard_set(img_request);
2170 img_request->snapc = snapc;
2171 } else if (op_type == OBJ_OP_WRITE) {
2172 img_request_write_set(img_request);
2173 img_request->snapc = snapc;
2174 } else {
2175 img_request->snap_id = rbd_dev->spec->snap_id;
2176 }
2177 if (rbd_dev_parent_get(rbd_dev))
2178 img_request_layered_set(img_request);
2179 spin_lock_init(&img_request->completion_lock);
2180 img_request->next_completion = 0;
2181 img_request->callback = NULL;
2182 img_request->result = 0;
2183 img_request->obj_request_count = 0;
2184 INIT_LIST_HEAD(&img_request->obj_requests);
2185 kref_init(&img_request->kref);
2186
2187 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2188 obj_op_name(op_type), offset, length, img_request);
2189
2190 return img_request;
2191 }
2192
2193 static void rbd_img_request_destroy(struct kref *kref)
2194 {
2195 struct rbd_img_request *img_request;
2196 struct rbd_obj_request *obj_request;
2197 struct rbd_obj_request *next_obj_request;
2198
2199 img_request = container_of(kref, struct rbd_img_request, kref);
2200
2201 dout("%s: img %p\n", __func__, img_request);
2202
2203 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2204 rbd_img_obj_request_del(img_request, obj_request);
2205 rbd_assert(img_request->obj_request_count == 0);
2206
2207 if (img_request_layered_test(img_request)) {
2208 img_request_layered_clear(img_request);
2209 rbd_dev_parent_put(img_request->rbd_dev);
2210 }
2211
2212 if (img_request_write_test(img_request) ||
2213 img_request_discard_test(img_request))
2214 ceph_put_snap_context(img_request->snapc);
2215
2216 kmem_cache_free(rbd_img_request_cache, img_request);
2217 }
2218
2219 static struct rbd_img_request *rbd_parent_request_create(
2220 struct rbd_obj_request *obj_request,
2221 u64 img_offset, u64 length)
2222 {
2223 struct rbd_img_request *parent_request;
2224 struct rbd_device *rbd_dev;
2225
2226 rbd_assert(obj_request->img_request);
2227 rbd_dev = obj_request->img_request->rbd_dev;
2228
2229 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2230 length, OBJ_OP_READ, NULL);
2231 if (!parent_request)
2232 return NULL;
2233
2234 img_request_child_set(parent_request);
2235 rbd_obj_request_get(obj_request);
2236 parent_request->obj_request = obj_request;
2237
2238 return parent_request;
2239 }
2240
2241 static void rbd_parent_request_destroy(struct kref *kref)
2242 {
2243 struct rbd_img_request *parent_request;
2244 struct rbd_obj_request *orig_request;
2245
2246 parent_request = container_of(kref, struct rbd_img_request, kref);
2247 orig_request = parent_request->obj_request;
2248
2249 parent_request->obj_request = NULL;
2250 rbd_obj_request_put(orig_request);
2251 img_request_child_clear(parent_request);
2252
2253 rbd_img_request_destroy(kref);
2254 }
2255
2256 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2257 {
2258 struct rbd_img_request *img_request;
2259 unsigned int xferred;
2260 int result;
2261 bool more;
2262
2263 rbd_assert(obj_request_img_data_test(obj_request));
2264 img_request = obj_request->img_request;
2265
2266 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2267 xferred = (unsigned int)obj_request->xferred;
2268 result = obj_request->result;
2269 if (result) {
2270 struct rbd_device *rbd_dev = img_request->rbd_dev;
2271 enum obj_operation_type op_type;
2272
2273 if (img_request_discard_test(img_request))
2274 op_type = OBJ_OP_DISCARD;
2275 else if (img_request_write_test(img_request))
2276 op_type = OBJ_OP_WRITE;
2277 else
2278 op_type = OBJ_OP_READ;
2279
2280 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2281 obj_op_name(op_type), obj_request->length,
2282 obj_request->img_offset, obj_request->offset);
2283 rbd_warn(rbd_dev, " result %d xferred %x",
2284 result, xferred);
2285 if (!img_request->result)
2286 img_request->result = result;
2287 /*
2288 * Need to end I/O on the entire obj_request worth of
2289 * bytes in case of error.
2290 */
2291 xferred = obj_request->length;
2292 }
2293
2294 if (img_request_child_test(img_request)) {
2295 rbd_assert(img_request->obj_request != NULL);
2296 more = obj_request->which < img_request->obj_request_count - 1;
2297 } else {
2298 blk_status_t status = errno_to_blk_status(result);
2299
2300 rbd_assert(img_request->rq != NULL);
2301
2302 more = blk_update_request(img_request->rq, status, xferred);
2303 if (!more)
2304 __blk_mq_end_request(img_request->rq, status);
2305 }
2306
2307 return more;
2308 }
2309
2310 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2311 {
2312 struct rbd_img_request *img_request;
2313 u32 which = obj_request->which;
2314 bool more = true;
2315
2316 rbd_assert(obj_request_img_data_test(obj_request));
2317 img_request = obj_request->img_request;
2318
2319 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2320 rbd_assert(img_request != NULL);
2321 rbd_assert(img_request->obj_request_count > 0);
2322 rbd_assert(which != BAD_WHICH);
2323 rbd_assert(which < img_request->obj_request_count);
2324
2325 spin_lock_irq(&img_request->completion_lock);
2326 if (which != img_request->next_completion)
2327 goto out;
2328
2329 for_each_obj_request_from(img_request, obj_request) {
2330 rbd_assert(more);
2331 rbd_assert(which < img_request->obj_request_count);
2332
2333 if (!obj_request_done_test(obj_request))
2334 break;
2335 more = rbd_img_obj_end_request(obj_request);
2336 which++;
2337 }
2338
2339 rbd_assert(more ^ (which == img_request->obj_request_count));
2340 img_request->next_completion = which;
2341 out:
2342 spin_unlock_irq(&img_request->completion_lock);
2343 rbd_img_request_put(img_request);
2344
2345 if (!more)
2346 rbd_img_request_complete(img_request);
2347 }
2348
2349 /*
2350 * Add individual osd ops to the given ceph_osd_request and prepare
2351 * them for submission. num_ops is the current number of
2352 * osd operations already to the object request.
2353 */
2354 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2355 struct ceph_osd_request *osd_request,
2356 enum obj_operation_type op_type,
2357 unsigned int num_ops)
2358 {
2359 struct rbd_img_request *img_request = obj_request->img_request;
2360 struct rbd_device *rbd_dev = img_request->rbd_dev;
2361 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2362 u64 offset = obj_request->offset;
2363 u64 length = obj_request->length;
2364 u64 img_end;
2365 u16 opcode;
2366
2367 if (op_type == OBJ_OP_DISCARD) {
2368 if (!offset && length == object_size &&
2369 (!img_request_layered_test(img_request) ||
2370 !obj_request_overlaps_parent(obj_request))) {
2371 opcode = CEPH_OSD_OP_DELETE;
2372 } else if ((offset + length == object_size)) {
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2374 } else {
2375 down_read(&rbd_dev->header_rwsem);
2376 img_end = rbd_dev->header.image_size;
2377 up_read(&rbd_dev->header_rwsem);
2378
2379 if (obj_request->img_offset + length == img_end)
2380 opcode = CEPH_OSD_OP_TRUNCATE;
2381 else
2382 opcode = CEPH_OSD_OP_ZERO;
2383 }
2384 } else if (op_type == OBJ_OP_WRITE) {
2385 if (!offset && length == object_size)
2386 opcode = CEPH_OSD_OP_WRITEFULL;
2387 else
2388 opcode = CEPH_OSD_OP_WRITE;
2389 osd_req_op_alloc_hint_init(osd_request, num_ops,
2390 object_size, object_size);
2391 num_ops++;
2392 } else {
2393 opcode = CEPH_OSD_OP_READ;
2394 }
2395
2396 if (opcode == CEPH_OSD_OP_DELETE)
2397 osd_req_op_init(osd_request, num_ops, opcode, 0);
2398 else
2399 osd_req_op_extent_init(osd_request, num_ops, opcode,
2400 offset, length, 0, 0);
2401
2402 if (obj_request->type == OBJ_REQUEST_BIO)
2403 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2404 obj_request->bio_list, length);
2405 else if (obj_request->type == OBJ_REQUEST_PAGES)
2406 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2407 obj_request->pages, length,
2408 offset & ~PAGE_MASK, false, false);
2409
2410 /* Discards are also writes */
2411 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2412 rbd_osd_req_format_write(obj_request);
2413 else
2414 rbd_osd_req_format_read(obj_request);
2415 }
2416
2417 /*
2418 * Split up an image request into one or more object requests, each
2419 * to a different object. The "type" parameter indicates whether
2420 * "data_desc" is the pointer to the head of a list of bio
2421 * structures, or the base of a page array. In either case this
2422 * function assumes data_desc describes memory sufficient to hold
2423 * all data described by the image request.
2424 */
2425 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2426 enum obj_request_type type,
2427 void *data_desc)
2428 {
2429 struct rbd_device *rbd_dev = img_request->rbd_dev;
2430 struct rbd_obj_request *obj_request = NULL;
2431 struct rbd_obj_request *next_obj_request;
2432 struct bio *bio_list = NULL;
2433 unsigned int bio_offset = 0;
2434 struct page **pages = NULL;
2435 enum obj_operation_type op_type;
2436 u64 img_offset;
2437 u64 resid;
2438
2439 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2440 (int)type, data_desc);
2441
2442 img_offset = img_request->offset;
2443 resid = img_request->length;
2444 rbd_assert(resid > 0);
2445 op_type = rbd_img_request_op_type(img_request);
2446
2447 if (type == OBJ_REQUEST_BIO) {
2448 bio_list = data_desc;
2449 rbd_assert(img_offset ==
2450 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2451 } else if (type == OBJ_REQUEST_PAGES) {
2452 pages = data_desc;
2453 }
2454
2455 while (resid) {
2456 struct ceph_osd_request *osd_req;
2457 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2458 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2459 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2460
2461 obj_request = rbd_obj_request_create(type);
2462 if (!obj_request)
2463 goto out_unwind;
2464
2465 obj_request->object_no = object_no;
2466 obj_request->offset = offset;
2467 obj_request->length = length;
2468
2469 /*
2470 * set obj_request->img_request before creating the
2471 * osd_request so that it gets the right snapc
2472 */
2473 rbd_img_obj_request_add(img_request, obj_request);
2474
2475 if (type == OBJ_REQUEST_BIO) {
2476 unsigned int clone_size;
2477
2478 rbd_assert(length <= (u64)UINT_MAX);
2479 clone_size = (unsigned int)length;
2480 obj_request->bio_list =
2481 bio_chain_clone_range(&bio_list,
2482 &bio_offset,
2483 clone_size,
2484 GFP_NOIO);
2485 if (!obj_request->bio_list)
2486 goto out_unwind;
2487 } else if (type == OBJ_REQUEST_PAGES) {
2488 unsigned int page_count;
2489
2490 obj_request->pages = pages;
2491 page_count = (u32)calc_pages_for(offset, length);
2492 obj_request->page_count = page_count;
2493 if ((offset + length) & ~PAGE_MASK)
2494 page_count--; /* more on last page */
2495 pages += page_count;
2496 }
2497
2498 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2499 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2500 obj_request);
2501 if (!osd_req)
2502 goto out_unwind;
2503
2504 obj_request->osd_req = osd_req;
2505 obj_request->callback = rbd_img_obj_callback;
2506 obj_request->img_offset = img_offset;
2507
2508 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2509
2510 img_offset += length;
2511 resid -= length;
2512 }
2513
2514 return 0;
2515
2516 out_unwind:
2517 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2518 rbd_img_obj_request_del(img_request, obj_request);
2519
2520 return -ENOMEM;
2521 }
2522
2523 static void
2524 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2525 {
2526 struct rbd_img_request *img_request;
2527 struct rbd_device *rbd_dev;
2528 struct page **pages;
2529 u32 page_count;
2530
2531 dout("%s: obj %p\n", __func__, obj_request);
2532
2533 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2534 obj_request->type == OBJ_REQUEST_NODATA);
2535 rbd_assert(obj_request_img_data_test(obj_request));
2536 img_request = obj_request->img_request;
2537 rbd_assert(img_request);
2538
2539 rbd_dev = img_request->rbd_dev;
2540 rbd_assert(rbd_dev);
2541
2542 pages = obj_request->copyup_pages;
2543 rbd_assert(pages != NULL);
2544 obj_request->copyup_pages = NULL;
2545 page_count = obj_request->copyup_page_count;
2546 rbd_assert(page_count);
2547 obj_request->copyup_page_count = 0;
2548 ceph_release_page_vector(pages, page_count);
2549
2550 /*
2551 * We want the transfer count to reflect the size of the
2552 * original write request. There is no such thing as a
2553 * successful short write, so if the request was successful
2554 * we can just set it to the originally-requested length.
2555 */
2556 if (!obj_request->result)
2557 obj_request->xferred = obj_request->length;
2558
2559 obj_request_done_set(obj_request);
2560 }
2561
2562 static void
2563 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2564 {
2565 struct rbd_obj_request *orig_request;
2566 struct ceph_osd_request *osd_req;
2567 struct rbd_device *rbd_dev;
2568 struct page **pages;
2569 enum obj_operation_type op_type;
2570 u32 page_count;
2571 int img_result;
2572 u64 parent_length;
2573
2574 rbd_assert(img_request_child_test(img_request));
2575
2576 /* First get what we need from the image request */
2577
2578 pages = img_request->copyup_pages;
2579 rbd_assert(pages != NULL);
2580 img_request->copyup_pages = NULL;
2581 page_count = img_request->copyup_page_count;
2582 rbd_assert(page_count);
2583 img_request->copyup_page_count = 0;
2584
2585 orig_request = img_request->obj_request;
2586 rbd_assert(orig_request != NULL);
2587 rbd_assert(obj_request_type_valid(orig_request->type));
2588 img_result = img_request->result;
2589 parent_length = img_request->length;
2590 rbd_assert(img_result || parent_length == img_request->xferred);
2591 rbd_img_request_put(img_request);
2592
2593 rbd_assert(orig_request->img_request);
2594 rbd_dev = orig_request->img_request->rbd_dev;
2595 rbd_assert(rbd_dev);
2596
2597 /*
2598 * If the overlap has become 0 (most likely because the
2599 * image has been flattened) we need to free the pages
2600 * and re-submit the original write request.
2601 */
2602 if (!rbd_dev->parent_overlap) {
2603 ceph_release_page_vector(pages, page_count);
2604 rbd_obj_request_submit(orig_request);
2605 return;
2606 }
2607
2608 if (img_result)
2609 goto out_err;
2610
2611 /*
2612 * The original osd request is of no use to use any more.
2613 * We need a new one that can hold the three ops in a copyup
2614 * request. Allocate the new copyup osd request for the
2615 * original request, and release the old one.
2616 */
2617 img_result = -ENOMEM;
2618 osd_req = rbd_osd_req_create_copyup(orig_request);
2619 if (!osd_req)
2620 goto out_err;
2621 rbd_osd_req_destroy(orig_request->osd_req);
2622 orig_request->osd_req = osd_req;
2623 orig_request->copyup_pages = pages;
2624 orig_request->copyup_page_count = page_count;
2625
2626 /* Initialize the copyup op */
2627
2628 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2629 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2630 false, false);
2631
2632 /* Add the other op(s) */
2633
2634 op_type = rbd_img_request_op_type(orig_request->img_request);
2635 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2636
2637 /* All set, send it off. */
2638
2639 rbd_obj_request_submit(orig_request);
2640 return;
2641
2642 out_err:
2643 ceph_release_page_vector(pages, page_count);
2644 rbd_obj_request_error(orig_request, img_result);
2645 }
2646
2647 /*
2648 * Read from the parent image the range of data that covers the
2649 * entire target of the given object request. This is used for
2650 * satisfying a layered image write request when the target of an
2651 * object request from the image request does not exist.
2652 *
2653 * A page array big enough to hold the returned data is allocated
2654 * and supplied to rbd_img_request_fill() as the "data descriptor."
2655 * When the read completes, this page array will be transferred to
2656 * the original object request for the copyup operation.
2657 *
2658 * If an error occurs, it is recorded as the result of the original
2659 * object request in rbd_img_obj_exists_callback().
2660 */
2661 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2662 {
2663 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2664 struct rbd_img_request *parent_request = NULL;
2665 u64 img_offset;
2666 u64 length;
2667 struct page **pages = NULL;
2668 u32 page_count;
2669 int result;
2670
2671 rbd_assert(rbd_dev->parent != NULL);
2672
2673 /*
2674 * Determine the byte range covered by the object in the
2675 * child image to which the original request was to be sent.
2676 */
2677 img_offset = obj_request->img_offset - obj_request->offset;
2678 length = rbd_obj_bytes(&rbd_dev->header);
2679
2680 /*
2681 * There is no defined parent data beyond the parent
2682 * overlap, so limit what we read at that boundary if
2683 * necessary.
2684 */
2685 if (img_offset + length > rbd_dev->parent_overlap) {
2686 rbd_assert(img_offset < rbd_dev->parent_overlap);
2687 length = rbd_dev->parent_overlap - img_offset;
2688 }
2689
2690 /*
2691 * Allocate a page array big enough to receive the data read
2692 * from the parent.
2693 */
2694 page_count = (u32)calc_pages_for(0, length);
2695 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2696 if (IS_ERR(pages)) {
2697 result = PTR_ERR(pages);
2698 pages = NULL;
2699 goto out_err;
2700 }
2701
2702 result = -ENOMEM;
2703 parent_request = rbd_parent_request_create(obj_request,
2704 img_offset, length);
2705 if (!parent_request)
2706 goto out_err;
2707
2708 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2709 if (result)
2710 goto out_err;
2711
2712 parent_request->copyup_pages = pages;
2713 parent_request->copyup_page_count = page_count;
2714 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2715
2716 result = rbd_img_request_submit(parent_request);
2717 if (!result)
2718 return 0;
2719
2720 parent_request->copyup_pages = NULL;
2721 parent_request->copyup_page_count = 0;
2722 parent_request->obj_request = NULL;
2723 rbd_obj_request_put(obj_request);
2724 out_err:
2725 if (pages)
2726 ceph_release_page_vector(pages, page_count);
2727 if (parent_request)
2728 rbd_img_request_put(parent_request);
2729 return result;
2730 }
2731
2732 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2733 {
2734 struct rbd_obj_request *orig_request;
2735 struct rbd_device *rbd_dev;
2736 int result;
2737
2738 rbd_assert(!obj_request_img_data_test(obj_request));
2739
2740 /*
2741 * All we need from the object request is the original
2742 * request and the result of the STAT op. Grab those, then
2743 * we're done with the request.
2744 */
2745 orig_request = obj_request->obj_request;
2746 obj_request->obj_request = NULL;
2747 rbd_obj_request_put(orig_request);
2748 rbd_assert(orig_request);
2749 rbd_assert(orig_request->img_request);
2750
2751 result = obj_request->result;
2752 obj_request->result = 0;
2753
2754 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755 obj_request, orig_request, result,
2756 obj_request->xferred, obj_request->length);
2757 rbd_obj_request_put(obj_request);
2758
2759 /*
2760 * If the overlap has become 0 (most likely because the
2761 * image has been flattened) we need to re-submit the
2762 * original request.
2763 */
2764 rbd_dev = orig_request->img_request->rbd_dev;
2765 if (!rbd_dev->parent_overlap) {
2766 rbd_obj_request_submit(orig_request);
2767 return;
2768 }
2769
2770 /*
2771 * Our only purpose here is to determine whether the object
2772 * exists, and we don't want to treat the non-existence as
2773 * an error. If something else comes back, transfer the
2774 * error to the original request and complete it now.
2775 */
2776 if (!result) {
2777 obj_request_existence_set(orig_request, true);
2778 } else if (result == -ENOENT) {
2779 obj_request_existence_set(orig_request, false);
2780 } else {
2781 goto fail_orig_request;
2782 }
2783
2784 /*
2785 * Resubmit the original request now that we have recorded
2786 * whether the target object exists.
2787 */
2788 result = rbd_img_obj_request_submit(orig_request);
2789 if (result)
2790 goto fail_orig_request;
2791
2792 return;
2793
2794 fail_orig_request:
2795 rbd_obj_request_error(orig_request, result);
2796 }
2797
2798 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2799 {
2800 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2801 struct rbd_obj_request *stat_request;
2802 struct page **pages;
2803 u32 page_count;
2804 size_t size;
2805 int ret;
2806
2807 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2808 if (!stat_request)
2809 return -ENOMEM;
2810
2811 stat_request->object_no = obj_request->object_no;
2812
2813 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2814 stat_request);
2815 if (!stat_request->osd_req) {
2816 ret = -ENOMEM;
2817 goto fail_stat_request;
2818 }
2819
2820 /*
2821 * The response data for a STAT call consists of:
2822 * le64 length;
2823 * struct {
2824 * le32 tv_sec;
2825 * le32 tv_nsec;
2826 * } mtime;
2827 */
2828 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829 page_count = (u32)calc_pages_for(0, size);
2830 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2831 if (IS_ERR(pages)) {
2832 ret = PTR_ERR(pages);
2833 goto fail_stat_request;
2834 }
2835
2836 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2837 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2838 false, false);
2839
2840 rbd_obj_request_get(obj_request);
2841 stat_request->obj_request = obj_request;
2842 stat_request->pages = pages;
2843 stat_request->page_count = page_count;
2844 stat_request->callback = rbd_img_obj_exists_callback;
2845
2846 rbd_obj_request_submit(stat_request);
2847 return 0;
2848
2849 fail_stat_request:
2850 rbd_obj_request_put(stat_request);
2851 return ret;
2852 }
2853
2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2855 {
2856 struct rbd_img_request *img_request = obj_request->img_request;
2857 struct rbd_device *rbd_dev = img_request->rbd_dev;
2858
2859 /* Reads */
2860 if (!img_request_write_test(img_request) &&
2861 !img_request_discard_test(img_request))
2862 return true;
2863
2864 /* Non-layered writes */
2865 if (!img_request_layered_test(img_request))
2866 return true;
2867
2868 /*
2869 * Layered writes outside of the parent overlap range don't
2870 * share any data with the parent.
2871 */
2872 if (!obj_request_overlaps_parent(obj_request))
2873 return true;
2874
2875 /*
2876 * Entire-object layered writes - we will overwrite whatever
2877 * parent data there is anyway.
2878 */
2879 if (!obj_request->offset &&
2880 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2881 return true;
2882
2883 /*
2884 * If the object is known to already exist, its parent data has
2885 * already been copied.
2886 */
2887 if (obj_request_known_test(obj_request) &&
2888 obj_request_exists_test(obj_request))
2889 return true;
2890
2891 return false;
2892 }
2893
2894 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2895 {
2896 rbd_assert(obj_request_img_data_test(obj_request));
2897 rbd_assert(obj_request_type_valid(obj_request->type));
2898 rbd_assert(obj_request->img_request);
2899
2900 if (img_obj_request_simple(obj_request)) {
2901 rbd_obj_request_submit(obj_request);
2902 return 0;
2903 }
2904
2905 /*
2906 * It's a layered write. The target object might exist but
2907 * we may not know that yet. If we know it doesn't exist,
2908 * start by reading the data for the full target object from
2909 * the parent so we can use it for a copyup to the target.
2910 */
2911 if (obj_request_known_test(obj_request))
2912 return rbd_img_obj_parent_read_full(obj_request);
2913
2914 /* We don't know whether the target exists. Go find out. */
2915
2916 return rbd_img_obj_exists_submit(obj_request);
2917 }
2918
2919 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2920 {
2921 struct rbd_obj_request *obj_request;
2922 struct rbd_obj_request *next_obj_request;
2923 int ret = 0;
2924
2925 dout("%s: img %p\n", __func__, img_request);
2926
2927 rbd_img_request_get(img_request);
2928 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2929 ret = rbd_img_obj_request_submit(obj_request);
2930 if (ret)
2931 goto out_put_ireq;
2932 }
2933
2934 out_put_ireq:
2935 rbd_img_request_put(img_request);
2936 return ret;
2937 }
2938
2939 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2940 {
2941 struct rbd_obj_request *obj_request;
2942 struct rbd_device *rbd_dev;
2943 u64 obj_end;
2944 u64 img_xferred;
2945 int img_result;
2946
2947 rbd_assert(img_request_child_test(img_request));
2948
2949 /* First get what we need from the image request and release it */
2950
2951 obj_request = img_request->obj_request;
2952 img_xferred = img_request->xferred;
2953 img_result = img_request->result;
2954 rbd_img_request_put(img_request);
2955
2956 /*
2957 * If the overlap has become 0 (most likely because the
2958 * image has been flattened) we need to re-submit the
2959 * original request.
2960 */
2961 rbd_assert(obj_request);
2962 rbd_assert(obj_request->img_request);
2963 rbd_dev = obj_request->img_request->rbd_dev;
2964 if (!rbd_dev->parent_overlap) {
2965 rbd_obj_request_submit(obj_request);
2966 return;
2967 }
2968
2969 obj_request->result = img_result;
2970 if (obj_request->result)
2971 goto out;
2972
2973 /*
2974 * We need to zero anything beyond the parent overlap
2975 * boundary. Since rbd_img_obj_request_read_callback()
2976 * will zero anything beyond the end of a short read, an
2977 * easy way to do this is to pretend the data from the
2978 * parent came up short--ending at the overlap boundary.
2979 */
2980 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2981 obj_end = obj_request->img_offset + obj_request->length;
2982 if (obj_end > rbd_dev->parent_overlap) {
2983 u64 xferred = 0;
2984
2985 if (obj_request->img_offset < rbd_dev->parent_overlap)
2986 xferred = rbd_dev->parent_overlap -
2987 obj_request->img_offset;
2988
2989 obj_request->xferred = min(img_xferred, xferred);
2990 } else {
2991 obj_request->xferred = img_xferred;
2992 }
2993 out:
2994 rbd_img_obj_request_read_callback(obj_request);
2995 rbd_obj_request_complete(obj_request);
2996 }
2997
2998 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2999 {
3000 struct rbd_img_request *img_request;
3001 int result;
3002
3003 rbd_assert(obj_request_img_data_test(obj_request));
3004 rbd_assert(obj_request->img_request != NULL);
3005 rbd_assert(obj_request->result == (s32) -ENOENT);
3006 rbd_assert(obj_request_type_valid(obj_request->type));
3007
3008 /* rbd_read_finish(obj_request, obj_request->length); */
3009 img_request = rbd_parent_request_create(obj_request,
3010 obj_request->img_offset,
3011 obj_request->length);
3012 result = -ENOMEM;
3013 if (!img_request)
3014 goto out_err;
3015
3016 if (obj_request->type == OBJ_REQUEST_BIO)
3017 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3018 obj_request->bio_list);
3019 else
3020 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3021 obj_request->pages);
3022 if (result)
3023 goto out_err;
3024
3025 img_request->callback = rbd_img_parent_read_callback;
3026 result = rbd_img_request_submit(img_request);
3027 if (result)
3028 goto out_err;
3029
3030 return;
3031 out_err:
3032 if (img_request)
3033 rbd_img_request_put(img_request);
3034 obj_request->result = result;
3035 obj_request->xferred = 0;
3036 obj_request_done_set(obj_request);
3037 }
3038
3039 static const struct rbd_client_id rbd_empty_cid;
3040
3041 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3042 const struct rbd_client_id *rhs)
3043 {
3044 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3045 }
3046
3047 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3048 {
3049 struct rbd_client_id cid;
3050
3051 mutex_lock(&rbd_dev->watch_mutex);
3052 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3053 cid.handle = rbd_dev->watch_cookie;
3054 mutex_unlock(&rbd_dev->watch_mutex);
3055 return cid;
3056 }
3057
3058 /*
3059 * lock_rwsem must be held for write
3060 */
3061 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3062 const struct rbd_client_id *cid)
3063 {
3064 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3065 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3066 cid->gid, cid->handle);
3067 rbd_dev->owner_cid = *cid; /* struct */
3068 }
3069
3070 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3071 {
3072 mutex_lock(&rbd_dev->watch_mutex);
3073 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3074 mutex_unlock(&rbd_dev->watch_mutex);
3075 }
3076
3077 /*
3078 * lock_rwsem must be held for write
3079 */
3080 static int rbd_lock(struct rbd_device *rbd_dev)
3081 {
3082 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3083 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3084 char cookie[32];
3085 int ret;
3086
3087 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3088 rbd_dev->lock_cookie[0] != '\0');
3089
3090 format_lock_cookie(rbd_dev, cookie);
3091 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3092 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3093 RBD_LOCK_TAG, "", 0);
3094 if (ret)
3095 return ret;
3096
3097 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3098 strcpy(rbd_dev->lock_cookie, cookie);
3099 rbd_set_owner_cid(rbd_dev, &cid);
3100 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3101 return 0;
3102 }
3103
3104 /*
3105 * lock_rwsem must be held for write
3106 */
3107 static void rbd_unlock(struct rbd_device *rbd_dev)
3108 {
3109 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3110 int ret;
3111
3112 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3113 rbd_dev->lock_cookie[0] == '\0');
3114
3115 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3116 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3117 if (ret && ret != -ENOENT)
3118 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3119
3120 /* treat errors as the image is unlocked */
3121 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3122 rbd_dev->lock_cookie[0] = '\0';
3123 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3124 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3125 }
3126
3127 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3128 enum rbd_notify_op notify_op,
3129 struct page ***preply_pages,
3130 size_t *preply_len)
3131 {
3132 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3133 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3134 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3135 char buf[buf_size];
3136 void *p = buf;
3137
3138 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3139
3140 /* encode *LockPayload NotifyMessage (op + ClientId) */
3141 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3142 ceph_encode_32(&p, notify_op);
3143 ceph_encode_64(&p, cid.gid);
3144 ceph_encode_64(&p, cid.handle);
3145
3146 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3147 &rbd_dev->header_oloc, buf, buf_size,
3148 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3149 }
3150
3151 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3152 enum rbd_notify_op notify_op)
3153 {
3154 struct page **reply_pages;
3155 size_t reply_len;
3156
3157 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3158 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3159 }
3160
3161 static void rbd_notify_acquired_lock(struct work_struct *work)
3162 {
3163 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3164 acquired_lock_work);
3165
3166 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3167 }
3168
3169 static void rbd_notify_released_lock(struct work_struct *work)
3170 {
3171 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3172 released_lock_work);
3173
3174 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3175 }
3176
3177 static int rbd_request_lock(struct rbd_device *rbd_dev)
3178 {
3179 struct page **reply_pages;
3180 size_t reply_len;
3181 bool lock_owner_responded = false;
3182 int ret;
3183
3184 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3185
3186 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3187 &reply_pages, &reply_len);
3188 if (ret && ret != -ETIMEDOUT) {
3189 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3190 goto out;
3191 }
3192
3193 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3194 void *p = page_address(reply_pages[0]);
3195 void *const end = p + reply_len;
3196 u32 n;
3197
3198 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3199 while (n--) {
3200 u8 struct_v;
3201 u32 len;
3202
3203 ceph_decode_need(&p, end, 8 + 8, e_inval);
3204 p += 8 + 8; /* skip gid and cookie */
3205
3206 ceph_decode_32_safe(&p, end, len, e_inval);
3207 if (!len)
3208 continue;
3209
3210 if (lock_owner_responded) {
3211 rbd_warn(rbd_dev,
3212 "duplicate lock owners detected");
3213 ret = -EIO;
3214 goto out;
3215 }
3216
3217 lock_owner_responded = true;
3218 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3219 &struct_v, &len);
3220 if (ret) {
3221 rbd_warn(rbd_dev,
3222 "failed to decode ResponseMessage: %d",
3223 ret);
3224 goto e_inval;
3225 }
3226
3227 ret = ceph_decode_32(&p);
3228 }
3229 }
3230
3231 if (!lock_owner_responded) {
3232 rbd_warn(rbd_dev, "no lock owners detected");
3233 ret = -ETIMEDOUT;
3234 }
3235
3236 out:
3237 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3238 return ret;
3239
3240 e_inval:
3241 ret = -EINVAL;
3242 goto out;
3243 }
3244
3245 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3246 {
3247 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3248
3249 cancel_delayed_work(&rbd_dev->lock_dwork);
3250 if (wake_all)
3251 wake_up_all(&rbd_dev->lock_waitq);
3252 else
3253 wake_up(&rbd_dev->lock_waitq);
3254 }
3255
3256 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3257 struct ceph_locker **lockers, u32 *num_lockers)
3258 {
3259 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3260 u8 lock_type;
3261 char *lock_tag;
3262 int ret;
3263
3264 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3265
3266 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3267 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3268 &lock_type, &lock_tag, lockers, num_lockers);
3269 if (ret)
3270 return ret;
3271
3272 if (*num_lockers == 0) {
3273 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3274 goto out;
3275 }
3276
3277 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3278 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3279 lock_tag);
3280 ret = -EBUSY;
3281 goto out;
3282 }
3283
3284 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3285 rbd_warn(rbd_dev, "shared lock type detected");
3286 ret = -EBUSY;
3287 goto out;
3288 }
3289
3290 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3291 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3292 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3293 (*lockers)[0].id.cookie);
3294 ret = -EBUSY;
3295 goto out;
3296 }
3297
3298 out:
3299 kfree(lock_tag);
3300 return ret;
3301 }
3302
3303 static int find_watcher(struct rbd_device *rbd_dev,
3304 const struct ceph_locker *locker)
3305 {
3306 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3307 struct ceph_watch_item *watchers;
3308 u32 num_watchers;
3309 u64 cookie;
3310 int i;
3311 int ret;
3312
3313 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3314 &rbd_dev->header_oloc, &watchers,
3315 &num_watchers);
3316 if (ret)
3317 return ret;
3318
3319 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3320 for (i = 0; i < num_watchers; i++) {
3321 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3322 sizeof(locker->info.addr)) &&
3323 watchers[i].cookie == cookie) {
3324 struct rbd_client_id cid = {
3325 .gid = le64_to_cpu(watchers[i].name.num),
3326 .handle = cookie,
3327 };
3328
3329 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3330 rbd_dev, cid.gid, cid.handle);
3331 rbd_set_owner_cid(rbd_dev, &cid);
3332 ret = 1;
3333 goto out;
3334 }
3335 }
3336
3337 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3338 ret = 0;
3339 out:
3340 kfree(watchers);
3341 return ret;
3342 }
3343
3344 /*
3345 * lock_rwsem must be held for write
3346 */
3347 static int rbd_try_lock(struct rbd_device *rbd_dev)
3348 {
3349 struct ceph_client *client = rbd_dev->rbd_client->client;
3350 struct ceph_locker *lockers;
3351 u32 num_lockers;
3352 int ret;
3353
3354 for (;;) {
3355 ret = rbd_lock(rbd_dev);
3356 if (ret != -EBUSY)
3357 return ret;
3358
3359 /* determine if the current lock holder is still alive */
3360 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3361 if (ret)
3362 return ret;
3363
3364 if (num_lockers == 0)
3365 goto again;
3366
3367 ret = find_watcher(rbd_dev, lockers);
3368 if (ret) {
3369 if (ret > 0)
3370 ret = 0; /* have to request lock */
3371 goto out;
3372 }
3373
3374 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3375 ENTITY_NAME(lockers[0].id.name));
3376
3377 ret = ceph_monc_blacklist_add(&client->monc,
3378 &lockers[0].info.addr);
3379 if (ret) {
3380 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3381 ENTITY_NAME(lockers[0].id.name), ret);
3382 goto out;
3383 }
3384
3385 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3386 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3387 lockers[0].id.cookie,
3388 &lockers[0].id.name);
3389 if (ret && ret != -ENOENT)
3390 goto out;
3391
3392 again:
3393 ceph_free_lockers(lockers, num_lockers);
3394 }
3395
3396 out:
3397 ceph_free_lockers(lockers, num_lockers);
3398 return ret;
3399 }
3400
3401 /*
3402 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3403 */
3404 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3405 int *pret)
3406 {
3407 enum rbd_lock_state lock_state;
3408
3409 down_read(&rbd_dev->lock_rwsem);
3410 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3411 rbd_dev->lock_state);
3412 if (__rbd_is_lock_owner(rbd_dev)) {
3413 lock_state = rbd_dev->lock_state;
3414 up_read(&rbd_dev->lock_rwsem);
3415 return lock_state;
3416 }
3417
3418 up_read(&rbd_dev->lock_rwsem);
3419 down_write(&rbd_dev->lock_rwsem);
3420 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3421 rbd_dev->lock_state);
3422 if (!__rbd_is_lock_owner(rbd_dev)) {
3423 *pret = rbd_try_lock(rbd_dev);
3424 if (*pret)
3425 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3426 }
3427
3428 lock_state = rbd_dev->lock_state;
3429 up_write(&rbd_dev->lock_rwsem);
3430 return lock_state;
3431 }
3432
3433 static void rbd_acquire_lock(struct work_struct *work)
3434 {
3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 struct rbd_device, lock_dwork);
3437 enum rbd_lock_state lock_state;
3438 int ret;
3439
3440 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 again:
3442 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3443 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3444 if (lock_state == RBD_LOCK_STATE_LOCKED)
3445 wake_requests(rbd_dev, true);
3446 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3447 rbd_dev, lock_state, ret);
3448 return;
3449 }
3450
3451 ret = rbd_request_lock(rbd_dev);
3452 if (ret == -ETIMEDOUT) {
3453 goto again; /* treat this as a dead client */
3454 } else if (ret == -EROFS) {
3455 rbd_warn(rbd_dev, "peer will not release lock");
3456 /*
3457 * If this is rbd_add_acquire_lock(), we want to fail
3458 * immediately -- reuse BLACKLISTED flag. Otherwise we
3459 * want to block.
3460 */
3461 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3462 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3463 /* wake "rbd map --exclusive" process */
3464 wake_requests(rbd_dev, false);
3465 }
3466 } else if (ret < 0) {
3467 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3468 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3469 RBD_RETRY_DELAY);
3470 } else {
3471 /*
3472 * lock owner acked, but resend if we don't see them
3473 * release the lock
3474 */
3475 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476 rbd_dev);
3477 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3478 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3479 }
3480 }
3481
3482 /*
3483 * lock_rwsem must be held for write
3484 */
3485 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3486 {
3487 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3488 rbd_dev->lock_state);
3489 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3490 return false;
3491
3492 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3493 downgrade_write(&rbd_dev->lock_rwsem);
3494 /*
3495 * Ensure that all in-flight IO is flushed.
3496 *
3497 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3498 * may be shared with other devices.
3499 */
3500 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3501 up_read(&rbd_dev->lock_rwsem);
3502
3503 down_write(&rbd_dev->lock_rwsem);
3504 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3505 rbd_dev->lock_state);
3506 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3507 return false;
3508
3509 rbd_unlock(rbd_dev);
3510 /*
3511 * Give others a chance to grab the lock - we would re-acquire
3512 * almost immediately if we got new IO during ceph_osdc_sync()
3513 * otherwise. We need to ack our own notifications, so this
3514 * lock_dwork will be requeued from rbd_wait_state_locked()
3515 * after wake_requests() in rbd_handle_released_lock().
3516 */
3517 cancel_delayed_work(&rbd_dev->lock_dwork);
3518 return true;
3519 }
3520
3521 static void rbd_release_lock_work(struct work_struct *work)
3522 {
3523 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3524 unlock_work);
3525
3526 down_write(&rbd_dev->lock_rwsem);
3527 rbd_release_lock(rbd_dev);
3528 up_write(&rbd_dev->lock_rwsem);
3529 }
3530
3531 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3532 void **p)
3533 {
3534 struct rbd_client_id cid = { 0 };
3535
3536 if (struct_v >= 2) {
3537 cid.gid = ceph_decode_64(p);
3538 cid.handle = ceph_decode_64(p);
3539 }
3540
3541 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542 cid.handle);
3543 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3544 down_write(&rbd_dev->lock_rwsem);
3545 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3546 /*
3547 * we already know that the remote client is
3548 * the owner
3549 */
3550 up_write(&rbd_dev->lock_rwsem);
3551 return;
3552 }
3553
3554 rbd_set_owner_cid(rbd_dev, &cid);
3555 downgrade_write(&rbd_dev->lock_rwsem);
3556 } else {
3557 down_read(&rbd_dev->lock_rwsem);
3558 }
3559
3560 if (!__rbd_is_lock_owner(rbd_dev))
3561 wake_requests(rbd_dev, false);
3562 up_read(&rbd_dev->lock_rwsem);
3563 }
3564
3565 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566 void **p)
3567 {
3568 struct rbd_client_id cid = { 0 };
3569
3570 if (struct_v >= 2) {
3571 cid.gid = ceph_decode_64(p);
3572 cid.handle = ceph_decode_64(p);
3573 }
3574
3575 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576 cid.handle);
3577 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3578 down_write(&rbd_dev->lock_rwsem);
3579 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3580 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3581 __func__, rbd_dev, cid.gid, cid.handle,
3582 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3583 up_write(&rbd_dev->lock_rwsem);
3584 return;
3585 }
3586
3587 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3588 downgrade_write(&rbd_dev->lock_rwsem);
3589 } else {
3590 down_read(&rbd_dev->lock_rwsem);
3591 }
3592
3593 if (!__rbd_is_lock_owner(rbd_dev))
3594 wake_requests(rbd_dev, false);
3595 up_read(&rbd_dev->lock_rwsem);
3596 }
3597
3598 /*
3599 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3600 * ResponseMessage is needed.
3601 */
3602 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3603 void **p)
3604 {
3605 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3606 struct rbd_client_id cid = { 0 };
3607 int result = 1;
3608
3609 if (struct_v >= 2) {
3610 cid.gid = ceph_decode_64(p);
3611 cid.handle = ceph_decode_64(p);
3612 }
3613
3614 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615 cid.handle);
3616 if (rbd_cid_equal(&cid, &my_cid))
3617 return result;
3618
3619 down_read(&rbd_dev->lock_rwsem);
3620 if (__rbd_is_lock_owner(rbd_dev)) {
3621 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3622 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3623 goto out_unlock;
3624
3625 /*
3626 * encode ResponseMessage(0) so the peer can detect
3627 * a missing owner
3628 */
3629 result = 0;
3630
3631 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3632 if (!rbd_dev->opts->exclusive) {
3633 dout("%s rbd_dev %p queueing unlock_work\n",
3634 __func__, rbd_dev);
3635 queue_work(rbd_dev->task_wq,
3636 &rbd_dev->unlock_work);
3637 } else {
3638 /* refuse to release the lock */
3639 result = -EROFS;
3640 }
3641 }
3642 }
3643
3644 out_unlock:
3645 up_read(&rbd_dev->lock_rwsem);
3646 return result;
3647 }
3648
3649 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3650 u64 notify_id, u64 cookie, s32 *result)
3651 {
3652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3653 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3654 char buf[buf_size];
3655 int ret;
3656
3657 if (result) {
3658 void *p = buf;
3659
3660 /* encode ResponseMessage */
3661 ceph_start_encoding(&p, 1, 1,
3662 buf_size - CEPH_ENCODING_START_BLK_LEN);
3663 ceph_encode_32(&p, *result);
3664 } else {
3665 buf_size = 0;
3666 }
3667
3668 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3669 &rbd_dev->header_oloc, notify_id, cookie,
3670 buf, buf_size);
3671 if (ret)
3672 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3673 }
3674
3675 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3676 u64 cookie)
3677 {
3678 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3679 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3680 }
3681
3682 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3683 u64 notify_id, u64 cookie, s32 result)
3684 {
3685 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3686 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3687 }
3688
3689 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690 u64 notifier_id, void *data, size_t data_len)
3691 {
3692 struct rbd_device *rbd_dev = arg;
3693 void *p = data;
3694 void *const end = p + data_len;
3695 u8 struct_v = 0;
3696 u32 len;
3697 u32 notify_op;
3698 int ret;
3699
3700 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3701 __func__, rbd_dev, cookie, notify_id, data_len);
3702 if (data_len) {
3703 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3704 &struct_v, &len);
3705 if (ret) {
3706 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3707 ret);
3708 return;
3709 }
3710
3711 notify_op = ceph_decode_32(&p);
3712 } else {
3713 /* legacy notification for header updates */
3714 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3715 len = 0;
3716 }
3717
3718 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3719 switch (notify_op) {
3720 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3721 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3722 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723 break;
3724 case RBD_NOTIFY_OP_RELEASED_LOCK:
3725 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3726 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727 break;
3728 case RBD_NOTIFY_OP_REQUEST_LOCK:
3729 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730 if (ret <= 0)
3731 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3732 cookie, ret);
3733 else
3734 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735 break;
3736 case RBD_NOTIFY_OP_HEADER_UPDATE:
3737 ret = rbd_dev_refresh(rbd_dev);
3738 if (ret)
3739 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3740
3741 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3742 break;
3743 default:
3744 if (rbd_is_lock_owner(rbd_dev))
3745 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3746 cookie, -EOPNOTSUPP);
3747 else
3748 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3749 break;
3750 }
3751 }
3752
3753 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3754
3755 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3756 {
3757 struct rbd_device *rbd_dev = arg;
3758
3759 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3760
3761 down_write(&rbd_dev->lock_rwsem);
3762 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3763 up_write(&rbd_dev->lock_rwsem);
3764
3765 mutex_lock(&rbd_dev->watch_mutex);
3766 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3767 __rbd_unregister_watch(rbd_dev);
3768 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3769
3770 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3771 }
3772 mutex_unlock(&rbd_dev->watch_mutex);
3773 }
3774
3775 /*
3776 * watch_mutex must be locked
3777 */
3778 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3779 {
3780 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3781 struct ceph_osd_linger_request *handle;
3782
3783 rbd_assert(!rbd_dev->watch_handle);
3784 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3785
3786 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3787 &rbd_dev->header_oloc, rbd_watch_cb,
3788 rbd_watch_errcb, rbd_dev);
3789 if (IS_ERR(handle))
3790 return PTR_ERR(handle);
3791
3792 rbd_dev->watch_handle = handle;
3793 return 0;
3794 }
3795
3796 /*
3797 * watch_mutex must be locked
3798 */
3799 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3800 {
3801 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802 int ret;
3803
3804 rbd_assert(rbd_dev->watch_handle);
3805 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806
3807 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808 if (ret)
3809 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3810
3811 rbd_dev->watch_handle = NULL;
3812 }
3813
3814 static int rbd_register_watch(struct rbd_device *rbd_dev)
3815 {
3816 int ret;
3817
3818 mutex_lock(&rbd_dev->watch_mutex);
3819 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3820 ret = __rbd_register_watch(rbd_dev);
3821 if (ret)
3822 goto out;
3823
3824 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3825 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3826
3827 out:
3828 mutex_unlock(&rbd_dev->watch_mutex);
3829 return ret;
3830 }
3831
3832 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3833 {
3834 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3835
3836 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3837 cancel_work_sync(&rbd_dev->acquired_lock_work);
3838 cancel_work_sync(&rbd_dev->released_lock_work);
3839 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3840 cancel_work_sync(&rbd_dev->unlock_work);
3841 }
3842
3843 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3844 {
3845 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3846 cancel_tasks_sync(rbd_dev);
3847
3848 mutex_lock(&rbd_dev->watch_mutex);
3849 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3850 __rbd_unregister_watch(rbd_dev);
3851 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3852 mutex_unlock(&rbd_dev->watch_mutex);
3853
3854 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3855 }
3856
3857 /*
3858 * lock_rwsem must be held for write
3859 */
3860 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3861 {
3862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863 char cookie[32];
3864 int ret;
3865
3866 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3867
3868 format_lock_cookie(rbd_dev, cookie);
3869 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3870 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3871 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3872 RBD_LOCK_TAG, cookie);
3873 if (ret) {
3874 if (ret != -EOPNOTSUPP)
3875 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3876 ret);
3877
3878 /*
3879 * Lock cookie cannot be updated on older OSDs, so do
3880 * a manual release and queue an acquire.
3881 */
3882 if (rbd_release_lock(rbd_dev))
3883 queue_delayed_work(rbd_dev->task_wq,
3884 &rbd_dev->lock_dwork, 0);
3885 } else {
3886 strcpy(rbd_dev->lock_cookie, cookie);
3887 }
3888 }
3889
3890 static void rbd_reregister_watch(struct work_struct *work)
3891 {
3892 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3893 struct rbd_device, watch_dwork);
3894 int ret;
3895
3896 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3897
3898 mutex_lock(&rbd_dev->watch_mutex);
3899 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3900 mutex_unlock(&rbd_dev->watch_mutex);
3901 return;
3902 }
3903
3904 ret = __rbd_register_watch(rbd_dev);
3905 if (ret) {
3906 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3907 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3908 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3909 wake_requests(rbd_dev, true);
3910 } else {
3911 queue_delayed_work(rbd_dev->task_wq,
3912 &rbd_dev->watch_dwork,
3913 RBD_RETRY_DELAY);
3914 }
3915 mutex_unlock(&rbd_dev->watch_mutex);
3916 return;
3917 }
3918
3919 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3920 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3921 mutex_unlock(&rbd_dev->watch_mutex);
3922
3923 down_write(&rbd_dev->lock_rwsem);
3924 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3925 rbd_reacquire_lock(rbd_dev);
3926 up_write(&rbd_dev->lock_rwsem);
3927
3928 ret = rbd_dev_refresh(rbd_dev);
3929 if (ret)
3930 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3931 }
3932
3933 /*
3934 * Synchronous osd object method call. Returns the number of bytes
3935 * returned in the outbound buffer, or a negative error code.
3936 */
3937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3938 struct ceph_object_id *oid,
3939 struct ceph_object_locator *oloc,
3940 const char *method_name,
3941 const void *outbound,
3942 size_t outbound_size,
3943 void *inbound,
3944 size_t inbound_size)
3945 {
3946 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3947 struct page *req_page = NULL;
3948 struct page *reply_page;
3949 int ret;
3950
3951 /*
3952 * Method calls are ultimately read operations. The result
3953 * should placed into the inbound buffer provided. They
3954 * also supply outbound data--parameters for the object
3955 * method. Currently if this is present it will be a
3956 * snapshot id.
3957 */
3958 if (outbound) {
3959 if (outbound_size > PAGE_SIZE)
3960 return -E2BIG;
3961
3962 req_page = alloc_page(GFP_KERNEL);
3963 if (!req_page)
3964 return -ENOMEM;
3965
3966 memcpy(page_address(req_page), outbound, outbound_size);
3967 }
3968
3969 reply_page = alloc_page(GFP_KERNEL);
3970 if (!reply_page) {
3971 if (req_page)
3972 __free_page(req_page);
3973 return -ENOMEM;
3974 }
3975
3976 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3977 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3978 reply_page, &inbound_size);
3979 if (!ret) {
3980 memcpy(inbound, page_address(reply_page), inbound_size);
3981 ret = inbound_size;
3982 }
3983
3984 if (req_page)
3985 __free_page(req_page);
3986 __free_page(reply_page);
3987 return ret;
3988 }
3989
3990 /*
3991 * lock_rwsem must be held for read
3992 */
3993 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3994 {
3995 DEFINE_WAIT(wait);
3996
3997 do {
3998 /*
3999 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4000 * and cancel_delayed_work() in wake_requests().
4001 */
4002 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4003 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4004 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4005 TASK_UNINTERRUPTIBLE);
4006 up_read(&rbd_dev->lock_rwsem);
4007 schedule();
4008 down_read(&rbd_dev->lock_rwsem);
4009 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4010 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4011
4012 finish_wait(&rbd_dev->lock_waitq, &wait);
4013 }
4014
4015 static void rbd_queue_workfn(struct work_struct *work)
4016 {
4017 struct request *rq = blk_mq_rq_from_pdu(work);
4018 struct rbd_device *rbd_dev = rq->q->queuedata;
4019 struct rbd_img_request *img_request;
4020 struct ceph_snap_context *snapc = NULL;
4021 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4022 u64 length = blk_rq_bytes(rq);
4023 enum obj_operation_type op_type;
4024 u64 mapping_size;
4025 bool must_be_locked;
4026 int result;
4027
4028 switch (req_op(rq)) {
4029 case REQ_OP_DISCARD:
4030 case REQ_OP_WRITE_ZEROES:
4031 op_type = OBJ_OP_DISCARD;
4032 break;
4033 case REQ_OP_WRITE:
4034 op_type = OBJ_OP_WRITE;
4035 break;
4036 case REQ_OP_READ:
4037 op_type = OBJ_OP_READ;
4038 break;
4039 default:
4040 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4041 result = -EIO;
4042 goto err;
4043 }
4044
4045 /* Ignore/skip any zero-length requests */
4046
4047 if (!length) {
4048 dout("%s: zero-length request\n", __func__);
4049 result = 0;
4050 goto err_rq;
4051 }
4052
4053 /* Only reads are allowed to a read-only device */
4054
4055 if (op_type != OBJ_OP_READ) {
4056 if (rbd_dev->mapping.read_only) {
4057 result = -EROFS;
4058 goto err_rq;
4059 }
4060 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4061 }
4062
4063 /*
4064 * Quit early if the mapped snapshot no longer exists. It's
4065 * still possible the snapshot will have disappeared by the
4066 * time our request arrives at the osd, but there's no sense in
4067 * sending it if we already know.
4068 */
4069 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4070 dout("request for non-existent snapshot");
4071 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4072 result = -ENXIO;
4073 goto err_rq;
4074 }
4075
4076 if (offset && length > U64_MAX - offset + 1) {
4077 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4078 length);
4079 result = -EINVAL;
4080 goto err_rq; /* Shouldn't happen */
4081 }
4082
4083 blk_mq_start_request(rq);
4084
4085 down_read(&rbd_dev->header_rwsem);
4086 mapping_size = rbd_dev->mapping.size;
4087 if (op_type != OBJ_OP_READ) {
4088 snapc = rbd_dev->header.snapc;
4089 ceph_get_snap_context(snapc);
4090 }
4091 up_read(&rbd_dev->header_rwsem);
4092
4093 if (offset + length > mapping_size) {
4094 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4095 length, mapping_size);
4096 result = -EIO;
4097 goto err_rq;
4098 }
4099
4100 must_be_locked =
4101 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4102 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4103 if (must_be_locked) {
4104 down_read(&rbd_dev->lock_rwsem);
4105 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4106 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4107 if (rbd_dev->opts->exclusive) {
4108 rbd_warn(rbd_dev, "exclusive lock required");
4109 result = -EROFS;
4110 goto err_unlock;
4111 }
4112 rbd_wait_state_locked(rbd_dev);
4113 }
4114 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4115 result = -EBLACKLISTED;
4116 goto err_unlock;
4117 }
4118 }
4119
4120 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4121 snapc);
4122 if (!img_request) {
4123 result = -ENOMEM;
4124 goto err_unlock;
4125 }
4126 img_request->rq = rq;
4127 snapc = NULL; /* img_request consumes a ref */
4128
4129 if (op_type == OBJ_OP_DISCARD)
4130 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4131 NULL);
4132 else
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4134 rq->bio);
4135 if (result)
4136 goto err_img_request;
4137
4138 result = rbd_img_request_submit(img_request);
4139 if (result)
4140 goto err_img_request;
4141
4142 if (must_be_locked)
4143 up_read(&rbd_dev->lock_rwsem);
4144 return;
4145
4146 err_img_request:
4147 rbd_img_request_put(img_request);
4148 err_unlock:
4149 if (must_be_locked)
4150 up_read(&rbd_dev->lock_rwsem);
4151 err_rq:
4152 if (result)
4153 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4154 obj_op_name(op_type), length, offset, result);
4155 ceph_put_snap_context(snapc);
4156 err:
4157 blk_mq_end_request(rq, errno_to_blk_status(result));
4158 }
4159
4160 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4161 const struct blk_mq_queue_data *bd)
4162 {
4163 struct request *rq = bd->rq;
4164 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4165
4166 queue_work(rbd_wq, work);
4167 return BLK_STS_OK;
4168 }
4169
4170 static void rbd_free_disk(struct rbd_device *rbd_dev)
4171 {
4172 blk_cleanup_queue(rbd_dev->disk->queue);
4173 blk_mq_free_tag_set(&rbd_dev->tag_set);
4174 put_disk(rbd_dev->disk);
4175 rbd_dev->disk = NULL;
4176 }
4177
4178 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4179 struct ceph_object_id *oid,
4180 struct ceph_object_locator *oloc,
4181 void *buf, int buf_len)
4182
4183 {
4184 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4185 struct ceph_osd_request *req;
4186 struct page **pages;
4187 int num_pages = calc_pages_for(0, buf_len);
4188 int ret;
4189
4190 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4191 if (!req)
4192 return -ENOMEM;
4193
4194 ceph_oid_copy(&req->r_base_oid, oid);
4195 ceph_oloc_copy(&req->r_base_oloc, oloc);
4196 req->r_flags = CEPH_OSD_FLAG_READ;
4197
4198 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4199 if (ret)
4200 goto out_req;
4201
4202 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4203 if (IS_ERR(pages)) {
4204 ret = PTR_ERR(pages);
4205 goto out_req;
4206 }
4207
4208 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4209 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4210 true);
4211
4212 ceph_osdc_start_request(osdc, req, false);
4213 ret = ceph_osdc_wait_request(osdc, req);
4214 if (ret >= 0)
4215 ceph_copy_from_page_vector(pages, buf, 0, ret);
4216
4217 out_req:
4218 ceph_osdc_put_request(req);
4219 return ret;
4220 }
4221
4222 /*
4223 * Read the complete header for the given rbd device. On successful
4224 * return, the rbd_dev->header field will contain up-to-date
4225 * information about the image.
4226 */
4227 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4228 {
4229 struct rbd_image_header_ondisk *ondisk = NULL;
4230 u32 snap_count = 0;
4231 u64 names_size = 0;
4232 u32 want_count;
4233 int ret;
4234
4235 /*
4236 * The complete header will include an array of its 64-bit
4237 * snapshot ids, followed by the names of those snapshots as
4238 * a contiguous block of NUL-terminated strings. Note that
4239 * the number of snapshots could change by the time we read
4240 * it in, in which case we re-read it.
4241 */
4242 do {
4243 size_t size;
4244
4245 kfree(ondisk);
4246
4247 size = sizeof (*ondisk);
4248 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4249 size += names_size;
4250 ondisk = kmalloc(size, GFP_KERNEL);
4251 if (!ondisk)
4252 return -ENOMEM;
4253
4254 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4255 &rbd_dev->header_oloc, ondisk, size);
4256 if (ret < 0)
4257 goto out;
4258 if ((size_t)ret < size) {
4259 ret = -ENXIO;
4260 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4261 size, ret);
4262 goto out;
4263 }
4264 if (!rbd_dev_ondisk_valid(ondisk)) {
4265 ret = -ENXIO;
4266 rbd_warn(rbd_dev, "invalid header");
4267 goto out;
4268 }
4269
4270 names_size = le64_to_cpu(ondisk->snap_names_len);
4271 want_count = snap_count;
4272 snap_count = le32_to_cpu(ondisk->snap_count);
4273 } while (snap_count != want_count);
4274
4275 ret = rbd_header_from_disk(rbd_dev, ondisk);
4276 out:
4277 kfree(ondisk);
4278
4279 return ret;
4280 }
4281
4282 /*
4283 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4284 * has disappeared from the (just updated) snapshot context.
4285 */
4286 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4287 {
4288 u64 snap_id;
4289
4290 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4291 return;
4292
4293 snap_id = rbd_dev->spec->snap_id;
4294 if (snap_id == CEPH_NOSNAP)
4295 return;
4296
4297 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4298 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4299 }
4300
4301 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4302 {
4303 sector_t size;
4304
4305 /*
4306 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4307 * try to update its size. If REMOVING is set, updating size
4308 * is just useless work since the device can't be opened.
4309 */
4310 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4311 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4312 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4313 dout("setting size to %llu sectors", (unsigned long long)size);
4314 set_capacity(rbd_dev->disk, size);
4315 revalidate_disk(rbd_dev->disk);
4316 }
4317 }
4318
4319 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4320 {
4321 u64 mapping_size;
4322 int ret;
4323
4324 down_write(&rbd_dev->header_rwsem);
4325 mapping_size = rbd_dev->mapping.size;
4326
4327 ret = rbd_dev_header_info(rbd_dev);
4328 if (ret)
4329 goto out;
4330
4331 /*
4332 * If there is a parent, see if it has disappeared due to the
4333 * mapped image getting flattened.
4334 */
4335 if (rbd_dev->parent) {
4336 ret = rbd_dev_v2_parent_info(rbd_dev);
4337 if (ret)
4338 goto out;
4339 }
4340
4341 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4342 rbd_dev->mapping.size = rbd_dev->header.image_size;
4343 } else {
4344 /* validate mapped snapshot's EXISTS flag */
4345 rbd_exists_validate(rbd_dev);
4346 }
4347
4348 out:
4349 up_write(&rbd_dev->header_rwsem);
4350 if (!ret && mapping_size != rbd_dev->mapping.size)
4351 rbd_dev_update_size(rbd_dev);
4352
4353 return ret;
4354 }
4355
4356 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4357 unsigned int hctx_idx, unsigned int numa_node)
4358 {
4359 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4360
4361 INIT_WORK(work, rbd_queue_workfn);
4362 return 0;
4363 }
4364
4365 static const struct blk_mq_ops rbd_mq_ops = {
4366 .queue_rq = rbd_queue_rq,
4367 .init_request = rbd_init_request,
4368 };
4369
4370 static int rbd_init_disk(struct rbd_device *rbd_dev)
4371 {
4372 struct gendisk *disk;
4373 struct request_queue *q;
4374 u64 segment_size;
4375 int err;
4376
4377 /* create gendisk info */
4378 disk = alloc_disk(single_major ?
4379 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4380 RBD_MINORS_PER_MAJOR);
4381 if (!disk)
4382 return -ENOMEM;
4383
4384 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4385 rbd_dev->dev_id);
4386 disk->major = rbd_dev->major;
4387 disk->first_minor = rbd_dev->minor;
4388 if (single_major)
4389 disk->flags |= GENHD_FL_EXT_DEVT;
4390 disk->fops = &rbd_bd_ops;
4391 disk->private_data = rbd_dev;
4392
4393 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4394 rbd_dev->tag_set.ops = &rbd_mq_ops;
4395 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4396 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4397 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4398 rbd_dev->tag_set.nr_hw_queues = 1;
4399 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4400
4401 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4402 if (err)
4403 goto out_disk;
4404
4405 q = blk_mq_init_queue(&rbd_dev->tag_set);
4406 if (IS_ERR(q)) {
4407 err = PTR_ERR(q);
4408 goto out_tag_set;
4409 }
4410
4411 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4412 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4413
4414 /* set io sizes to object size */
4415 segment_size = rbd_obj_bytes(&rbd_dev->header);
4416 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4417 q->limits.max_sectors = queue_max_hw_sectors(q);
4418 blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4419 blk_queue_max_segment_size(q, segment_size);
4420 blk_queue_io_min(q, segment_size);
4421 blk_queue_io_opt(q, segment_size);
4422
4423 /* enable the discard support */
4424 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4425 q->limits.discard_granularity = segment_size;
4426 q->limits.discard_alignment = segment_size;
4427 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4428 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4429
4430 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4431 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4432
4433 /*
4434 * disk_release() expects a queue ref from add_disk() and will
4435 * put it. Hold an extra ref until add_disk() is called.
4436 */
4437 WARN_ON(!blk_get_queue(q));
4438 disk->queue = q;
4439 q->queuedata = rbd_dev;
4440
4441 rbd_dev->disk = disk;
4442
4443 return 0;
4444 out_tag_set:
4445 blk_mq_free_tag_set(&rbd_dev->tag_set);
4446 out_disk:
4447 put_disk(disk);
4448 return err;
4449 }
4450
4451 /*
4452 sysfs
4453 */
4454
4455 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4456 {
4457 return container_of(dev, struct rbd_device, dev);
4458 }
4459
4460 static ssize_t rbd_size_show(struct device *dev,
4461 struct device_attribute *attr, char *buf)
4462 {
4463 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4464
4465 return sprintf(buf, "%llu\n",
4466 (unsigned long long)rbd_dev->mapping.size);
4467 }
4468
4469 /*
4470 * Note this shows the features for whatever's mapped, which is not
4471 * necessarily the base image.
4472 */
4473 static ssize_t rbd_features_show(struct device *dev,
4474 struct device_attribute *attr, char *buf)
4475 {
4476 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4477
4478 return sprintf(buf, "0x%016llx\n",
4479 (unsigned long long)rbd_dev->mapping.features);
4480 }
4481
4482 static ssize_t rbd_major_show(struct device *dev,
4483 struct device_attribute *attr, char *buf)
4484 {
4485 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4486
4487 if (rbd_dev->major)
4488 return sprintf(buf, "%d\n", rbd_dev->major);
4489
4490 return sprintf(buf, "(none)\n");
4491 }
4492
4493 static ssize_t rbd_minor_show(struct device *dev,
4494 struct device_attribute *attr, char *buf)
4495 {
4496 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4497
4498 return sprintf(buf, "%d\n", rbd_dev->minor);
4499 }
4500
4501 static ssize_t rbd_client_addr_show(struct device *dev,
4502 struct device_attribute *attr, char *buf)
4503 {
4504 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4505 struct ceph_entity_addr *client_addr =
4506 ceph_client_addr(rbd_dev->rbd_client->client);
4507
4508 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4509 le32_to_cpu(client_addr->nonce));
4510 }
4511
4512 static ssize_t rbd_client_id_show(struct device *dev,
4513 struct device_attribute *attr, char *buf)
4514 {
4515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4516
4517 return sprintf(buf, "client%lld\n",
4518 ceph_client_gid(rbd_dev->rbd_client->client));
4519 }
4520
4521 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4522 struct device_attribute *attr, char *buf)
4523 {
4524 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4525
4526 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4527 }
4528
4529 static ssize_t rbd_config_info_show(struct device *dev,
4530 struct device_attribute *attr, char *buf)
4531 {
4532 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4533
4534 return sprintf(buf, "%s\n", rbd_dev->config_info);
4535 }
4536
4537 static ssize_t rbd_pool_show(struct device *dev,
4538 struct device_attribute *attr, char *buf)
4539 {
4540 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4541
4542 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4543 }
4544
4545 static ssize_t rbd_pool_id_show(struct device *dev,
4546 struct device_attribute *attr, char *buf)
4547 {
4548 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4549
4550 return sprintf(buf, "%llu\n",
4551 (unsigned long long) rbd_dev->spec->pool_id);
4552 }
4553
4554 static ssize_t rbd_name_show(struct device *dev,
4555 struct device_attribute *attr, char *buf)
4556 {
4557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4558
4559 if (rbd_dev->spec->image_name)
4560 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4561
4562 return sprintf(buf, "(unknown)\n");
4563 }
4564
4565 static ssize_t rbd_image_id_show(struct device *dev,
4566 struct device_attribute *attr, char *buf)
4567 {
4568 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4569
4570 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4571 }
4572
4573 /*
4574 * Shows the name of the currently-mapped snapshot (or
4575 * RBD_SNAP_HEAD_NAME for the base image).
4576 */
4577 static ssize_t rbd_snap_show(struct device *dev,
4578 struct device_attribute *attr,
4579 char *buf)
4580 {
4581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4582
4583 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4584 }
4585
4586 static ssize_t rbd_snap_id_show(struct device *dev,
4587 struct device_attribute *attr, char *buf)
4588 {
4589 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4590
4591 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4592 }
4593
4594 /*
4595 * For a v2 image, shows the chain of parent images, separated by empty
4596 * lines. For v1 images or if there is no parent, shows "(no parent
4597 * image)".
4598 */
4599 static ssize_t rbd_parent_show(struct device *dev,
4600 struct device_attribute *attr,
4601 char *buf)
4602 {
4603 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604 ssize_t count = 0;
4605
4606 if (!rbd_dev->parent)
4607 return sprintf(buf, "(no parent image)\n");
4608
4609 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4610 struct rbd_spec *spec = rbd_dev->parent_spec;
4611
4612 count += sprintf(&buf[count], "%s"
4613 "pool_id %llu\npool_name %s\n"
4614 "image_id %s\nimage_name %s\n"
4615 "snap_id %llu\nsnap_name %s\n"
4616 "overlap %llu\n",
4617 !count ? "" : "\n", /* first? */
4618 spec->pool_id, spec->pool_name,
4619 spec->image_id, spec->image_name ?: "(unknown)",
4620 spec->snap_id, spec->snap_name,
4621 rbd_dev->parent_overlap);
4622 }
4623
4624 return count;
4625 }
4626
4627 static ssize_t rbd_image_refresh(struct device *dev,
4628 struct device_attribute *attr,
4629 const char *buf,
4630 size_t size)
4631 {
4632 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4633 int ret;
4634
4635 ret = rbd_dev_refresh(rbd_dev);
4636 if (ret)
4637 return ret;
4638
4639 return size;
4640 }
4641
4642 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4643 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4644 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4645 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4646 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4647 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4648 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4649 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4650 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4651 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4652 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4653 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4654 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4655 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4656 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4657 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4658
4659 static struct attribute *rbd_attrs[] = {
4660 &dev_attr_size.attr,
4661 &dev_attr_features.attr,
4662 &dev_attr_major.attr,
4663 &dev_attr_minor.attr,
4664 &dev_attr_client_addr.attr,
4665 &dev_attr_client_id.attr,
4666 &dev_attr_cluster_fsid.attr,
4667 &dev_attr_config_info.attr,
4668 &dev_attr_pool.attr,
4669 &dev_attr_pool_id.attr,
4670 &dev_attr_name.attr,
4671 &dev_attr_image_id.attr,
4672 &dev_attr_current_snap.attr,
4673 &dev_attr_snap_id.attr,
4674 &dev_attr_parent.attr,
4675 &dev_attr_refresh.attr,
4676 NULL
4677 };
4678
4679 static struct attribute_group rbd_attr_group = {
4680 .attrs = rbd_attrs,
4681 };
4682
4683 static const struct attribute_group *rbd_attr_groups[] = {
4684 &rbd_attr_group,
4685 NULL
4686 };
4687
4688 static void rbd_dev_release(struct device *dev);
4689
4690 static const struct device_type rbd_device_type = {
4691 .name = "rbd",
4692 .groups = rbd_attr_groups,
4693 .release = rbd_dev_release,
4694 };
4695
4696 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4697 {
4698 kref_get(&spec->kref);
4699
4700 return spec;
4701 }
4702
4703 static void rbd_spec_free(struct kref *kref);
4704 static void rbd_spec_put(struct rbd_spec *spec)
4705 {
4706 if (spec)
4707 kref_put(&spec->kref, rbd_spec_free);
4708 }
4709
4710 static struct rbd_spec *rbd_spec_alloc(void)
4711 {
4712 struct rbd_spec *spec;
4713
4714 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4715 if (!spec)
4716 return NULL;
4717
4718 spec->pool_id = CEPH_NOPOOL;
4719 spec->snap_id = CEPH_NOSNAP;
4720 kref_init(&spec->kref);
4721
4722 return spec;
4723 }
4724
4725 static void rbd_spec_free(struct kref *kref)
4726 {
4727 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4728
4729 kfree(spec->pool_name);
4730 kfree(spec->image_id);
4731 kfree(spec->image_name);
4732 kfree(spec->snap_name);
4733 kfree(spec);
4734 }
4735
4736 static void rbd_dev_free(struct rbd_device *rbd_dev)
4737 {
4738 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4739 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4740
4741 ceph_oid_destroy(&rbd_dev->header_oid);
4742 ceph_oloc_destroy(&rbd_dev->header_oloc);
4743 kfree(rbd_dev->config_info);
4744
4745 rbd_put_client(rbd_dev->rbd_client);
4746 rbd_spec_put(rbd_dev->spec);
4747 kfree(rbd_dev->opts);
4748 kfree(rbd_dev);
4749 }
4750
4751 static void rbd_dev_release(struct device *dev)
4752 {
4753 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4754 bool need_put = !!rbd_dev->opts;
4755
4756 if (need_put) {
4757 destroy_workqueue(rbd_dev->task_wq);
4758 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4759 }
4760
4761 rbd_dev_free(rbd_dev);
4762
4763 /*
4764 * This is racy, but way better than putting module outside of
4765 * the release callback. The race window is pretty small, so
4766 * doing something similar to dm (dm-builtin.c) is overkill.
4767 */
4768 if (need_put)
4769 module_put(THIS_MODULE);
4770 }
4771
4772 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4773 struct rbd_spec *spec)
4774 {
4775 struct rbd_device *rbd_dev;
4776
4777 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4778 if (!rbd_dev)
4779 return NULL;
4780
4781 spin_lock_init(&rbd_dev->lock);
4782 INIT_LIST_HEAD(&rbd_dev->node);
4783 init_rwsem(&rbd_dev->header_rwsem);
4784
4785 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4786 ceph_oid_init(&rbd_dev->header_oid);
4787 rbd_dev->header_oloc.pool = spec->pool_id;
4788
4789 mutex_init(&rbd_dev->watch_mutex);
4790 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4791 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4792
4793 init_rwsem(&rbd_dev->lock_rwsem);
4794 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4795 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4796 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4797 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4798 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4799 init_waitqueue_head(&rbd_dev->lock_waitq);
4800
4801 rbd_dev->dev.bus = &rbd_bus_type;
4802 rbd_dev->dev.type = &rbd_device_type;
4803 rbd_dev->dev.parent = &rbd_root_dev;
4804 device_initialize(&rbd_dev->dev);
4805
4806 rbd_dev->rbd_client = rbdc;
4807 rbd_dev->spec = spec;
4808
4809 return rbd_dev;
4810 }
4811
4812 /*
4813 * Create a mapping rbd_dev.
4814 */
4815 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4816 struct rbd_spec *spec,
4817 struct rbd_options *opts)
4818 {
4819 struct rbd_device *rbd_dev;
4820
4821 rbd_dev = __rbd_dev_create(rbdc, spec);
4822 if (!rbd_dev)
4823 return NULL;
4824
4825 rbd_dev->opts = opts;
4826
4827 /* get an id and fill in device name */
4828 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4829 minor_to_rbd_dev_id(1 << MINORBITS),
4830 GFP_KERNEL);
4831 if (rbd_dev->dev_id < 0)
4832 goto fail_rbd_dev;
4833
4834 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4835 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4836 rbd_dev->name);
4837 if (!rbd_dev->task_wq)
4838 goto fail_dev_id;
4839
4840 /* we have a ref from do_rbd_add() */
4841 __module_get(THIS_MODULE);
4842
4843 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4844 return rbd_dev;
4845
4846 fail_dev_id:
4847 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848 fail_rbd_dev:
4849 rbd_dev_free(rbd_dev);
4850 return NULL;
4851 }
4852
4853 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4854 {
4855 if (rbd_dev)
4856 put_device(&rbd_dev->dev);
4857 }
4858
4859 /*
4860 * Get the size and object order for an image snapshot, or if
4861 * snap_id is CEPH_NOSNAP, gets this information for the base
4862 * image.
4863 */
4864 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4865 u8 *order, u64 *snap_size)
4866 {
4867 __le64 snapid = cpu_to_le64(snap_id);
4868 int ret;
4869 struct {
4870 u8 order;
4871 __le64 size;
4872 } __attribute__ ((packed)) size_buf = { 0 };
4873
4874 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4875 &rbd_dev->header_oloc, "get_size",
4876 &snapid, sizeof(snapid),
4877 &size_buf, sizeof(size_buf));
4878 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4879 if (ret < 0)
4880 return ret;
4881 if (ret < sizeof (size_buf))
4882 return -ERANGE;
4883
4884 if (order) {
4885 *order = size_buf.order;
4886 dout(" order %u", (unsigned int)*order);
4887 }
4888 *snap_size = le64_to_cpu(size_buf.size);
4889
4890 dout(" snap_id 0x%016llx snap_size = %llu\n",
4891 (unsigned long long)snap_id,
4892 (unsigned long long)*snap_size);
4893
4894 return 0;
4895 }
4896
4897 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4898 {
4899 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4900 &rbd_dev->header.obj_order,
4901 &rbd_dev->header.image_size);
4902 }
4903
4904 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4905 {
4906 void *reply_buf;
4907 int ret;
4908 void *p;
4909
4910 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4911 if (!reply_buf)
4912 return -ENOMEM;
4913
4914 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4915 &rbd_dev->header_oloc, "get_object_prefix",
4916 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4918 if (ret < 0)
4919 goto out;
4920
4921 p = reply_buf;
4922 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4923 p + ret, NULL, GFP_NOIO);
4924 ret = 0;
4925
4926 if (IS_ERR(rbd_dev->header.object_prefix)) {
4927 ret = PTR_ERR(rbd_dev->header.object_prefix);
4928 rbd_dev->header.object_prefix = NULL;
4929 } else {
4930 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4931 }
4932 out:
4933 kfree(reply_buf);
4934
4935 return ret;
4936 }
4937
4938 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4939 u64 *snap_features)
4940 {
4941 __le64 snapid = cpu_to_le64(snap_id);
4942 struct {
4943 __le64 features;
4944 __le64 incompat;
4945 } __attribute__ ((packed)) features_buf = { 0 };
4946 u64 unsup;
4947 int ret;
4948
4949 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4950 &rbd_dev->header_oloc, "get_features",
4951 &snapid, sizeof(snapid),
4952 &features_buf, sizeof(features_buf));
4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4954 if (ret < 0)
4955 return ret;
4956 if (ret < sizeof (features_buf))
4957 return -ERANGE;
4958
4959 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4960 if (unsup) {
4961 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4962 unsup);
4963 return -ENXIO;
4964 }
4965
4966 *snap_features = le64_to_cpu(features_buf.features);
4967
4968 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4969 (unsigned long long)snap_id,
4970 (unsigned long long)*snap_features,
4971 (unsigned long long)le64_to_cpu(features_buf.incompat));
4972
4973 return 0;
4974 }
4975
4976 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4977 {
4978 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4979 &rbd_dev->header.features);
4980 }
4981
4982 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4983 {
4984 struct rbd_spec *parent_spec;
4985 size_t size;
4986 void *reply_buf = NULL;
4987 __le64 snapid;
4988 void *p;
4989 void *end;
4990 u64 pool_id;
4991 char *image_id;
4992 u64 snap_id;
4993 u64 overlap;
4994 int ret;
4995
4996 parent_spec = rbd_spec_alloc();
4997 if (!parent_spec)
4998 return -ENOMEM;
4999
5000 size = sizeof (__le64) + /* pool_id */
5001 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5002 sizeof (__le64) + /* snap_id */
5003 sizeof (__le64); /* overlap */
5004 reply_buf = kmalloc(size, GFP_KERNEL);
5005 if (!reply_buf) {
5006 ret = -ENOMEM;
5007 goto out_err;
5008 }
5009
5010 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5011 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5012 &rbd_dev->header_oloc, "get_parent",
5013 &snapid, sizeof(snapid), reply_buf, size);
5014 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5015 if (ret < 0)
5016 goto out_err;
5017
5018 p = reply_buf;
5019 end = reply_buf + ret;
5020 ret = -ERANGE;
5021 ceph_decode_64_safe(&p, end, pool_id, out_err);
5022 if (pool_id == CEPH_NOPOOL) {
5023 /*
5024 * Either the parent never existed, or we have
5025 * record of it but the image got flattened so it no
5026 * longer has a parent. When the parent of a
5027 * layered image disappears we immediately set the
5028 * overlap to 0. The effect of this is that all new
5029 * requests will be treated as if the image had no
5030 * parent.
5031 */
5032 if (rbd_dev->parent_overlap) {
5033 rbd_dev->parent_overlap = 0;
5034 rbd_dev_parent_put(rbd_dev);
5035 pr_info("%s: clone image has been flattened\n",
5036 rbd_dev->disk->disk_name);
5037 }
5038
5039 goto out; /* No parent? No problem. */
5040 }
5041
5042 /* The ceph file layout needs to fit pool id in 32 bits */
5043
5044 ret = -EIO;
5045 if (pool_id > (u64)U32_MAX) {
5046 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5047 (unsigned long long)pool_id, U32_MAX);
5048 goto out_err;
5049 }
5050
5051 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5052 if (IS_ERR(image_id)) {
5053 ret = PTR_ERR(image_id);
5054 goto out_err;
5055 }
5056 ceph_decode_64_safe(&p, end, snap_id, out_err);
5057 ceph_decode_64_safe(&p, end, overlap, out_err);
5058
5059 /*
5060 * The parent won't change (except when the clone is
5061 * flattened, already handled that). So we only need to
5062 * record the parent spec we have not already done so.
5063 */
5064 if (!rbd_dev->parent_spec) {
5065 parent_spec->pool_id = pool_id;
5066 parent_spec->image_id = image_id;
5067 parent_spec->snap_id = snap_id;
5068 rbd_dev->parent_spec = parent_spec;
5069 parent_spec = NULL; /* rbd_dev now owns this */
5070 } else {
5071 kfree(image_id);
5072 }
5073
5074 /*
5075 * We always update the parent overlap. If it's zero we issue
5076 * a warning, as we will proceed as if there was no parent.
5077 */
5078 if (!overlap) {
5079 if (parent_spec) {
5080 /* refresh, careful to warn just once */
5081 if (rbd_dev->parent_overlap)
5082 rbd_warn(rbd_dev,
5083 "clone now standalone (overlap became 0)");
5084 } else {
5085 /* initial probe */
5086 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5087 }
5088 }
5089 rbd_dev->parent_overlap = overlap;
5090
5091 out:
5092 ret = 0;
5093 out_err:
5094 kfree(reply_buf);
5095 rbd_spec_put(parent_spec);
5096
5097 return ret;
5098 }
5099
5100 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5101 {
5102 struct {
5103 __le64 stripe_unit;
5104 __le64 stripe_count;
5105 } __attribute__ ((packed)) striping_info_buf = { 0 };
5106 size_t size = sizeof (striping_info_buf);
5107 void *p;
5108 u64 obj_size;
5109 u64 stripe_unit;
5110 u64 stripe_count;
5111 int ret;
5112
5113 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5114 &rbd_dev->header_oloc, "get_stripe_unit_count",
5115 NULL, 0, &striping_info_buf, size);
5116 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5117 if (ret < 0)
5118 return ret;
5119 if (ret < size)
5120 return -ERANGE;
5121
5122 /*
5123 * We don't actually support the "fancy striping" feature
5124 * (STRIPINGV2) yet, but if the striping sizes are the
5125 * defaults the behavior is the same as before. So find
5126 * out, and only fail if the image has non-default values.
5127 */
5128 ret = -EINVAL;
5129 obj_size = rbd_obj_bytes(&rbd_dev->header);
5130 p = &striping_info_buf;
5131 stripe_unit = ceph_decode_64(&p);
5132 if (stripe_unit != obj_size) {
5133 rbd_warn(rbd_dev, "unsupported stripe unit "
5134 "(got %llu want %llu)",
5135 stripe_unit, obj_size);
5136 return -EINVAL;
5137 }
5138 stripe_count = ceph_decode_64(&p);
5139 if (stripe_count != 1) {
5140 rbd_warn(rbd_dev, "unsupported stripe count "
5141 "(got %llu want 1)", stripe_count);
5142 return -EINVAL;
5143 }
5144 rbd_dev->header.stripe_unit = stripe_unit;
5145 rbd_dev->header.stripe_count = stripe_count;
5146
5147 return 0;
5148 }
5149
5150 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5151 {
5152 __le64 data_pool_id;
5153 int ret;
5154
5155 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5156 &rbd_dev->header_oloc, "get_data_pool",
5157 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5158 if (ret < 0)
5159 return ret;
5160 if (ret < sizeof(data_pool_id))
5161 return -EBADMSG;
5162
5163 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5164 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5165 return 0;
5166 }
5167
5168 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5169 {
5170 CEPH_DEFINE_OID_ONSTACK(oid);
5171 size_t image_id_size;
5172 char *image_id;
5173 void *p;
5174 void *end;
5175 size_t size;
5176 void *reply_buf = NULL;
5177 size_t len = 0;
5178 char *image_name = NULL;
5179 int ret;
5180
5181 rbd_assert(!rbd_dev->spec->image_name);
5182
5183 len = strlen(rbd_dev->spec->image_id);
5184 image_id_size = sizeof (__le32) + len;
5185 image_id = kmalloc(image_id_size, GFP_KERNEL);
5186 if (!image_id)
5187 return NULL;
5188
5189 p = image_id;
5190 end = image_id + image_id_size;
5191 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5192
5193 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5194 reply_buf = kmalloc(size, GFP_KERNEL);
5195 if (!reply_buf)
5196 goto out;
5197
5198 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5199 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5200 "dir_get_name", image_id, image_id_size,
5201 reply_buf, size);
5202 if (ret < 0)
5203 goto out;
5204 p = reply_buf;
5205 end = reply_buf + ret;
5206
5207 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5208 if (IS_ERR(image_name))
5209 image_name = NULL;
5210 else
5211 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5212 out:
5213 kfree(reply_buf);
5214 kfree(image_id);
5215
5216 return image_name;
5217 }
5218
5219 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5220 {
5221 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5222 const char *snap_name;
5223 u32 which = 0;
5224
5225 /* Skip over names until we find the one we are looking for */
5226
5227 snap_name = rbd_dev->header.snap_names;
5228 while (which < snapc->num_snaps) {
5229 if (!strcmp(name, snap_name))
5230 return snapc->snaps[which];
5231 snap_name += strlen(snap_name) + 1;
5232 which++;
5233 }
5234 return CEPH_NOSNAP;
5235 }
5236
5237 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5238 {
5239 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5240 u32 which;
5241 bool found = false;
5242 u64 snap_id;
5243
5244 for (which = 0; !found && which < snapc->num_snaps; which++) {
5245 const char *snap_name;
5246
5247 snap_id = snapc->snaps[which];
5248 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5249 if (IS_ERR(snap_name)) {
5250 /* ignore no-longer existing snapshots */
5251 if (PTR_ERR(snap_name) == -ENOENT)
5252 continue;
5253 else
5254 break;
5255 }
5256 found = !strcmp(name, snap_name);
5257 kfree(snap_name);
5258 }
5259 return found ? snap_id : CEPH_NOSNAP;
5260 }
5261
5262 /*
5263 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5264 * no snapshot by that name is found, or if an error occurs.
5265 */
5266 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5267 {
5268 if (rbd_dev->image_format == 1)
5269 return rbd_v1_snap_id_by_name(rbd_dev, name);
5270
5271 return rbd_v2_snap_id_by_name(rbd_dev, name);
5272 }
5273
5274 /*
5275 * An image being mapped will have everything but the snap id.
5276 */
5277 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5278 {
5279 struct rbd_spec *spec = rbd_dev->spec;
5280
5281 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5282 rbd_assert(spec->image_id && spec->image_name);
5283 rbd_assert(spec->snap_name);
5284
5285 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5286 u64 snap_id;
5287
5288 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5289 if (snap_id == CEPH_NOSNAP)
5290 return -ENOENT;
5291
5292 spec->snap_id = snap_id;
5293 } else {
5294 spec->snap_id = CEPH_NOSNAP;
5295 }
5296
5297 return 0;
5298 }
5299
5300 /*
5301 * A parent image will have all ids but none of the names.
5302 *
5303 * All names in an rbd spec are dynamically allocated. It's OK if we
5304 * can't figure out the name for an image id.
5305 */
5306 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5307 {
5308 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5309 struct rbd_spec *spec = rbd_dev->spec;
5310 const char *pool_name;
5311 const char *image_name;
5312 const char *snap_name;
5313 int ret;
5314
5315 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5316 rbd_assert(spec->image_id);
5317 rbd_assert(spec->snap_id != CEPH_NOSNAP);
5318
5319 /* Get the pool name; we have to make our own copy of this */
5320
5321 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5322 if (!pool_name) {
5323 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5324 return -EIO;
5325 }
5326 pool_name = kstrdup(pool_name, GFP_KERNEL);
5327 if (!pool_name)
5328 return -ENOMEM;
5329
5330 /* Fetch the image name; tolerate failure here */
5331
5332 image_name = rbd_dev_image_name(rbd_dev);
5333 if (!image_name)
5334 rbd_warn(rbd_dev, "unable to get image name");
5335
5336 /* Fetch the snapshot name */
5337
5338 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5339 if (IS_ERR(snap_name)) {
5340 ret = PTR_ERR(snap_name);
5341 goto out_err;
5342 }
5343
5344 spec->pool_name = pool_name;
5345 spec->image_name = image_name;
5346 spec->snap_name = snap_name;
5347
5348 return 0;
5349
5350 out_err:
5351 kfree(image_name);
5352 kfree(pool_name);
5353 return ret;
5354 }
5355
5356 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5357 {
5358 size_t size;
5359 int ret;
5360 void *reply_buf;
5361 void *p;
5362 void *end;
5363 u64 seq;
5364 u32 snap_count;
5365 struct ceph_snap_context *snapc;
5366 u32 i;
5367
5368 /*
5369 * We'll need room for the seq value (maximum snapshot id),
5370 * snapshot count, and array of that many snapshot ids.
5371 * For now we have a fixed upper limit on the number we're
5372 * prepared to receive.
5373 */
5374 size = sizeof (__le64) + sizeof (__le32) +
5375 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5376 reply_buf = kzalloc(size, GFP_KERNEL);
5377 if (!reply_buf)
5378 return -ENOMEM;
5379
5380 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5381 &rbd_dev->header_oloc, "get_snapcontext",
5382 NULL, 0, reply_buf, size);
5383 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5384 if (ret < 0)
5385 goto out;
5386
5387 p = reply_buf;
5388 end = reply_buf + ret;
5389 ret = -ERANGE;
5390 ceph_decode_64_safe(&p, end, seq, out);
5391 ceph_decode_32_safe(&p, end, snap_count, out);
5392
5393 /*
5394 * Make sure the reported number of snapshot ids wouldn't go
5395 * beyond the end of our buffer. But before checking that,
5396 * make sure the computed size of the snapshot context we
5397 * allocate is representable in a size_t.
5398 */
5399 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5400 / sizeof (u64)) {
5401 ret = -EINVAL;
5402 goto out;
5403 }
5404 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5405 goto out;
5406 ret = 0;
5407
5408 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5409 if (!snapc) {
5410 ret = -ENOMEM;
5411 goto out;
5412 }
5413 snapc->seq = seq;
5414 for (i = 0; i < snap_count; i++)
5415 snapc->snaps[i] = ceph_decode_64(&p);
5416
5417 ceph_put_snap_context(rbd_dev->header.snapc);
5418 rbd_dev->header.snapc = snapc;
5419
5420 dout(" snap context seq = %llu, snap_count = %u\n",
5421 (unsigned long long)seq, (unsigned int)snap_count);
5422 out:
5423 kfree(reply_buf);
5424
5425 return ret;
5426 }
5427
5428 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5429 u64 snap_id)
5430 {
5431 size_t size;
5432 void *reply_buf;
5433 __le64 snapid;
5434 int ret;
5435 void *p;
5436 void *end;
5437 char *snap_name;
5438
5439 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5440 reply_buf = kmalloc(size, GFP_KERNEL);
5441 if (!reply_buf)
5442 return ERR_PTR(-ENOMEM);
5443
5444 snapid = cpu_to_le64(snap_id);
5445 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5446 &rbd_dev->header_oloc, "get_snapshot_name",
5447 &snapid, sizeof(snapid), reply_buf, size);
5448 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5449 if (ret < 0) {
5450 snap_name = ERR_PTR(ret);
5451 goto out;
5452 }
5453
5454 p = reply_buf;
5455 end = reply_buf + ret;
5456 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5457 if (IS_ERR(snap_name))
5458 goto out;
5459
5460 dout(" snap_id 0x%016llx snap_name = %s\n",
5461 (unsigned long long)snap_id, snap_name);
5462 out:
5463 kfree(reply_buf);
5464
5465 return snap_name;
5466 }
5467
5468 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5469 {
5470 bool first_time = rbd_dev->header.object_prefix == NULL;
5471 int ret;
5472
5473 ret = rbd_dev_v2_image_size(rbd_dev);
5474 if (ret)
5475 return ret;
5476
5477 if (first_time) {
5478 ret = rbd_dev_v2_header_onetime(rbd_dev);
5479 if (ret)
5480 return ret;
5481 }
5482
5483 ret = rbd_dev_v2_snap_context(rbd_dev);
5484 if (ret && first_time) {
5485 kfree(rbd_dev->header.object_prefix);
5486 rbd_dev->header.object_prefix = NULL;
5487 }
5488
5489 return ret;
5490 }
5491
5492 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5493 {
5494 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5495
5496 if (rbd_dev->image_format == 1)
5497 return rbd_dev_v1_header_info(rbd_dev);
5498
5499 return rbd_dev_v2_header_info(rbd_dev);
5500 }
5501
5502 /*
5503 * Skips over white space at *buf, and updates *buf to point to the
5504 * first found non-space character (if any). Returns the length of
5505 * the token (string of non-white space characters) found. Note
5506 * that *buf must be terminated with '\0'.
5507 */
5508 static inline size_t next_token(const char **buf)
5509 {
5510 /*
5511 * These are the characters that produce nonzero for
5512 * isspace() in the "C" and "POSIX" locales.
5513 */
5514 const char *spaces = " \f\n\r\t\v";
5515
5516 *buf += strspn(*buf, spaces); /* Find start of token */
5517
5518 return strcspn(*buf, spaces); /* Return token length */
5519 }
5520
5521 /*
5522 * Finds the next token in *buf, dynamically allocates a buffer big
5523 * enough to hold a copy of it, and copies the token into the new
5524 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5525 * that a duplicate buffer is created even for a zero-length token.
5526 *
5527 * Returns a pointer to the newly-allocated duplicate, or a null
5528 * pointer if memory for the duplicate was not available. If
5529 * the lenp argument is a non-null pointer, the length of the token
5530 * (not including the '\0') is returned in *lenp.
5531 *
5532 * If successful, the *buf pointer will be updated to point beyond
5533 * the end of the found token.
5534 *
5535 * Note: uses GFP_KERNEL for allocation.
5536 */
5537 static inline char *dup_token(const char **buf, size_t *lenp)
5538 {
5539 char *dup;
5540 size_t len;
5541
5542 len = next_token(buf);
5543 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5544 if (!dup)
5545 return NULL;
5546 *(dup + len) = '\0';
5547 *buf += len;
5548
5549 if (lenp)
5550 *lenp = len;
5551
5552 return dup;
5553 }
5554
5555 /*
5556 * Parse the options provided for an "rbd add" (i.e., rbd image
5557 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5558 * and the data written is passed here via a NUL-terminated buffer.
5559 * Returns 0 if successful or an error code otherwise.
5560 *
5561 * The information extracted from these options is recorded in
5562 * the other parameters which return dynamically-allocated
5563 * structures:
5564 * ceph_opts
5565 * The address of a pointer that will refer to a ceph options
5566 * structure. Caller must release the returned pointer using
5567 * ceph_destroy_options() when it is no longer needed.
5568 * rbd_opts
5569 * Address of an rbd options pointer. Fully initialized by
5570 * this function; caller must release with kfree().
5571 * spec
5572 * Address of an rbd image specification pointer. Fully
5573 * initialized by this function based on parsed options.
5574 * Caller must release with rbd_spec_put().
5575 *
5576 * The options passed take this form:
5577 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5578 * where:
5579 * <mon_addrs>
5580 * A comma-separated list of one or more monitor addresses.
5581 * A monitor address is an ip address, optionally followed
5582 * by a port number (separated by a colon).
5583 * I.e.: ip1[:port1][,ip2[:port2]...]
5584 * <options>
5585 * A comma-separated list of ceph and/or rbd options.
5586 * <pool_name>
5587 * The name of the rados pool containing the rbd image.
5588 * <image_name>
5589 * The name of the image in that pool to map.
5590 * <snap_id>
5591 * An optional snapshot id. If provided, the mapping will
5592 * present data from the image at the time that snapshot was
5593 * created. The image head is used if no snapshot id is
5594 * provided. Snapshot mappings are always read-only.
5595 */
5596 static int rbd_add_parse_args(const char *buf,
5597 struct ceph_options **ceph_opts,
5598 struct rbd_options **opts,
5599 struct rbd_spec **rbd_spec)
5600 {
5601 size_t len;
5602 char *options;
5603 const char *mon_addrs;
5604 char *snap_name;
5605 size_t mon_addrs_size;
5606 struct rbd_spec *spec = NULL;
5607 struct rbd_options *rbd_opts = NULL;
5608 struct ceph_options *copts;
5609 int ret;
5610
5611 /* The first four tokens are required */
5612
5613 len = next_token(&buf);
5614 if (!len) {
5615 rbd_warn(NULL, "no monitor address(es) provided");
5616 return -EINVAL;
5617 }
5618 mon_addrs = buf;
5619 mon_addrs_size = len + 1;
5620 buf += len;
5621
5622 ret = -EINVAL;
5623 options = dup_token(&buf, NULL);
5624 if (!options)
5625 return -ENOMEM;
5626 if (!*options) {
5627 rbd_warn(NULL, "no options provided");
5628 goto out_err;
5629 }
5630
5631 spec = rbd_spec_alloc();
5632 if (!spec)
5633 goto out_mem;
5634
5635 spec->pool_name = dup_token(&buf, NULL);
5636 if (!spec->pool_name)
5637 goto out_mem;
5638 if (!*spec->pool_name) {
5639 rbd_warn(NULL, "no pool name provided");
5640 goto out_err;
5641 }
5642
5643 spec->image_name = dup_token(&buf, NULL);
5644 if (!spec->image_name)
5645 goto out_mem;
5646 if (!*spec->image_name) {
5647 rbd_warn(NULL, "no image name provided");
5648 goto out_err;
5649 }
5650
5651 /*
5652 * Snapshot name is optional; default is to use "-"
5653 * (indicating the head/no snapshot).
5654 */
5655 len = next_token(&buf);
5656 if (!len) {
5657 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5658 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5659 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5660 ret = -ENAMETOOLONG;
5661 goto out_err;
5662 }
5663 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5664 if (!snap_name)
5665 goto out_mem;
5666 *(snap_name + len) = '\0';
5667 spec->snap_name = snap_name;
5668
5669 /* Initialize all rbd options to the defaults */
5670
5671 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5672 if (!rbd_opts)
5673 goto out_mem;
5674
5675 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5676 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5677 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5678 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5679
5680 copts = ceph_parse_options(options, mon_addrs,
5681 mon_addrs + mon_addrs_size - 1,
5682 parse_rbd_opts_token, rbd_opts);
5683 if (IS_ERR(copts)) {
5684 ret = PTR_ERR(copts);
5685 goto out_err;
5686 }
5687 kfree(options);
5688
5689 *ceph_opts = copts;
5690 *opts = rbd_opts;
5691 *rbd_spec = spec;
5692
5693 return 0;
5694 out_mem:
5695 ret = -ENOMEM;
5696 out_err:
5697 kfree(rbd_opts);
5698 rbd_spec_put(spec);
5699 kfree(options);
5700
5701 return ret;
5702 }
5703
5704 /*
5705 * Return pool id (>= 0) or a negative error code.
5706 */
5707 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5708 {
5709 struct ceph_options *opts = rbdc->client->options;
5710 u64 newest_epoch;
5711 int tries = 0;
5712 int ret;
5713
5714 again:
5715 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5716 if (ret == -ENOENT && tries++ < 1) {
5717 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5718 &newest_epoch);
5719 if (ret < 0)
5720 return ret;
5721
5722 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5723 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5724 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5725 newest_epoch,
5726 opts->mount_timeout);
5727 goto again;
5728 } else {
5729 /* the osdmap we have is new enough */
5730 return -ENOENT;
5731 }
5732 }
5733
5734 return ret;
5735 }
5736
5737 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5738 {
5739 down_write(&rbd_dev->lock_rwsem);
5740 if (__rbd_is_lock_owner(rbd_dev))
5741 rbd_unlock(rbd_dev);
5742 up_write(&rbd_dev->lock_rwsem);
5743 }
5744
5745 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5746 {
5747 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5748 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5749 return -EINVAL;
5750 }
5751
5752 /* FIXME: "rbd map --exclusive" should be in interruptible */
5753 down_read(&rbd_dev->lock_rwsem);
5754 rbd_wait_state_locked(rbd_dev);
5755 up_read(&rbd_dev->lock_rwsem);
5756 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5757 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5758 return -EROFS;
5759 }
5760
5761 return 0;
5762 }
5763
5764 /*
5765 * An rbd format 2 image has a unique identifier, distinct from the
5766 * name given to it by the user. Internally, that identifier is
5767 * what's used to specify the names of objects related to the image.
5768 *
5769 * A special "rbd id" object is used to map an rbd image name to its
5770 * id. If that object doesn't exist, then there is no v2 rbd image
5771 * with the supplied name.
5772 *
5773 * This function will record the given rbd_dev's image_id field if
5774 * it can be determined, and in that case will return 0. If any
5775 * errors occur a negative errno will be returned and the rbd_dev's
5776 * image_id field will be unchanged (and should be NULL).
5777 */
5778 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5779 {
5780 int ret;
5781 size_t size;
5782 CEPH_DEFINE_OID_ONSTACK(oid);
5783 void *response;
5784 char *image_id;
5785
5786 /*
5787 * When probing a parent image, the image id is already
5788 * known (and the image name likely is not). There's no
5789 * need to fetch the image id again in this case. We
5790 * do still need to set the image format though.
5791 */
5792 if (rbd_dev->spec->image_id) {
5793 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5794
5795 return 0;
5796 }
5797
5798 /*
5799 * First, see if the format 2 image id file exists, and if
5800 * so, get the image's persistent id from it.
5801 */
5802 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5803 rbd_dev->spec->image_name);
5804 if (ret)
5805 return ret;
5806
5807 dout("rbd id object name is %s\n", oid.name);
5808
5809 /* Response will be an encoded string, which includes a length */
5810
5811 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5812 response = kzalloc(size, GFP_NOIO);
5813 if (!response) {
5814 ret = -ENOMEM;
5815 goto out;
5816 }
5817
5818 /* If it doesn't exist we'll assume it's a format 1 image */
5819
5820 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5821 "get_id", NULL, 0,
5822 response, RBD_IMAGE_ID_LEN_MAX);
5823 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5824 if (ret == -ENOENT) {
5825 image_id = kstrdup("", GFP_KERNEL);
5826 ret = image_id ? 0 : -ENOMEM;
5827 if (!ret)
5828 rbd_dev->image_format = 1;
5829 } else if (ret >= 0) {
5830 void *p = response;
5831
5832 image_id = ceph_extract_encoded_string(&p, p + ret,
5833 NULL, GFP_NOIO);
5834 ret = PTR_ERR_OR_ZERO(image_id);
5835 if (!ret)
5836 rbd_dev->image_format = 2;
5837 }
5838
5839 if (!ret) {
5840 rbd_dev->spec->image_id = image_id;
5841 dout("image_id is %s\n", image_id);
5842 }
5843 out:
5844 kfree(response);
5845 ceph_oid_destroy(&oid);
5846 return ret;
5847 }
5848
5849 /*
5850 * Undo whatever state changes are made by v1 or v2 header info
5851 * call.
5852 */
5853 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5854 {
5855 struct rbd_image_header *header;
5856
5857 rbd_dev_parent_put(rbd_dev);
5858
5859 /* Free dynamic fields from the header, then zero it out */
5860
5861 header = &rbd_dev->header;
5862 ceph_put_snap_context(header->snapc);
5863 kfree(header->snap_sizes);
5864 kfree(header->snap_names);
5865 kfree(header->object_prefix);
5866 memset(header, 0, sizeof (*header));
5867 }
5868
5869 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5870 {
5871 int ret;
5872
5873 ret = rbd_dev_v2_object_prefix(rbd_dev);
5874 if (ret)
5875 goto out_err;
5876
5877 /*
5878 * Get the and check features for the image. Currently the
5879 * features are assumed to never change.
5880 */
5881 ret = rbd_dev_v2_features(rbd_dev);
5882 if (ret)
5883 goto out_err;
5884
5885 /* If the image supports fancy striping, get its parameters */
5886
5887 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5888 ret = rbd_dev_v2_striping_info(rbd_dev);
5889 if (ret < 0)
5890 goto out_err;
5891 }
5892
5893 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5894 ret = rbd_dev_v2_data_pool(rbd_dev);
5895 if (ret)
5896 goto out_err;
5897 }
5898
5899 rbd_init_layout(rbd_dev);
5900 return 0;
5901
5902 out_err:
5903 rbd_dev->header.features = 0;
5904 kfree(rbd_dev->header.object_prefix);
5905 rbd_dev->header.object_prefix = NULL;
5906 return ret;
5907 }
5908
5909 /*
5910 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5911 * rbd_dev_image_probe() recursion depth, which means it's also the
5912 * length of the already discovered part of the parent chain.
5913 */
5914 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5915 {
5916 struct rbd_device *parent = NULL;
5917 int ret;
5918
5919 if (!rbd_dev->parent_spec)
5920 return 0;
5921
5922 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5923 pr_info("parent chain is too long (%d)\n", depth);
5924 ret = -EINVAL;
5925 goto out_err;
5926 }
5927
5928 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5929 if (!parent) {
5930 ret = -ENOMEM;
5931 goto out_err;
5932 }
5933
5934 /*
5935 * Images related by parent/child relationships always share
5936 * rbd_client and spec/parent_spec, so bump their refcounts.
5937 */
5938 __rbd_get_client(rbd_dev->rbd_client);
5939 rbd_spec_get(rbd_dev->parent_spec);
5940
5941 ret = rbd_dev_image_probe(parent, depth);
5942 if (ret < 0)
5943 goto out_err;
5944
5945 rbd_dev->parent = parent;
5946 atomic_set(&rbd_dev->parent_ref, 1);
5947 return 0;
5948
5949 out_err:
5950 rbd_dev_unparent(rbd_dev);
5951 rbd_dev_destroy(parent);
5952 return ret;
5953 }
5954
5955 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5956 {
5957 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5958 rbd_dev_mapping_clear(rbd_dev);
5959 rbd_free_disk(rbd_dev);
5960 if (!single_major)
5961 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5962 }
5963
5964 /*
5965 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5966 * upon return.
5967 */
5968 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5969 {
5970 int ret;
5971
5972 /* Record our major and minor device numbers. */
5973
5974 if (!single_major) {
5975 ret = register_blkdev(0, rbd_dev->name);
5976 if (ret < 0)
5977 goto err_out_unlock;
5978
5979 rbd_dev->major = ret;
5980 rbd_dev->minor = 0;
5981 } else {
5982 rbd_dev->major = rbd_major;
5983 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5984 }
5985
5986 /* Set up the blkdev mapping. */
5987
5988 ret = rbd_init_disk(rbd_dev);
5989 if (ret)
5990 goto err_out_blkdev;
5991
5992 ret = rbd_dev_mapping_set(rbd_dev);
5993 if (ret)
5994 goto err_out_disk;
5995
5996 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5997 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5998
5999 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6000 if (ret)
6001 goto err_out_mapping;
6002
6003 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6004 up_write(&rbd_dev->header_rwsem);
6005 return 0;
6006
6007 err_out_mapping:
6008 rbd_dev_mapping_clear(rbd_dev);
6009 err_out_disk:
6010 rbd_free_disk(rbd_dev);
6011 err_out_blkdev:
6012 if (!single_major)
6013 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6014 err_out_unlock:
6015 up_write(&rbd_dev->header_rwsem);
6016 return ret;
6017 }
6018
6019 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6020 {
6021 struct rbd_spec *spec = rbd_dev->spec;
6022 int ret;
6023
6024 /* Record the header object name for this rbd image. */
6025
6026 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6027 if (rbd_dev->image_format == 1)
6028 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6029 spec->image_name, RBD_SUFFIX);
6030 else
6031 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6032 RBD_HEADER_PREFIX, spec->image_id);
6033
6034 return ret;
6035 }
6036
6037 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6038 {
6039 rbd_dev_unprobe(rbd_dev);
6040 if (rbd_dev->opts)
6041 rbd_unregister_watch(rbd_dev);
6042 rbd_dev->image_format = 0;
6043 kfree(rbd_dev->spec->image_id);
6044 rbd_dev->spec->image_id = NULL;
6045 }
6046
6047 /*
6048 * Probe for the existence of the header object for the given rbd
6049 * device. If this image is the one being mapped (i.e., not a
6050 * parent), initiate a watch on its header object before using that
6051 * object to get detailed information about the rbd image.
6052 */
6053 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6054 {
6055 int ret;
6056
6057 /*
6058 * Get the id from the image id object. Unless there's an
6059 * error, rbd_dev->spec->image_id will be filled in with
6060 * a dynamically-allocated string, and rbd_dev->image_format
6061 * will be set to either 1 or 2.
6062 */
6063 ret = rbd_dev_image_id(rbd_dev);
6064 if (ret)
6065 return ret;
6066
6067 ret = rbd_dev_header_name(rbd_dev);
6068 if (ret)
6069 goto err_out_format;
6070
6071 if (!depth) {
6072 ret = rbd_register_watch(rbd_dev);
6073 if (ret) {
6074 if (ret == -ENOENT)
6075 pr_info("image %s/%s does not exist\n",
6076 rbd_dev->spec->pool_name,
6077 rbd_dev->spec->image_name);
6078 goto err_out_format;
6079 }
6080 }
6081
6082 ret = rbd_dev_header_info(rbd_dev);
6083 if (ret)
6084 goto err_out_watch;
6085
6086 /*
6087 * If this image is the one being mapped, we have pool name and
6088 * id, image name and id, and snap name - need to fill snap id.
6089 * Otherwise this is a parent image, identified by pool, image
6090 * and snap ids - need to fill in names for those ids.
6091 */
6092 if (!depth)
6093 ret = rbd_spec_fill_snap_id(rbd_dev);
6094 else
6095 ret = rbd_spec_fill_names(rbd_dev);
6096 if (ret) {
6097 if (ret == -ENOENT)
6098 pr_info("snap %s/%s@%s does not exist\n",
6099 rbd_dev->spec->pool_name,
6100 rbd_dev->spec->image_name,
6101 rbd_dev->spec->snap_name);
6102 goto err_out_probe;
6103 }
6104
6105 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6106 ret = rbd_dev_v2_parent_info(rbd_dev);
6107 if (ret)
6108 goto err_out_probe;
6109
6110 /*
6111 * Need to warn users if this image is the one being
6112 * mapped and has a parent.
6113 */
6114 if (!depth && rbd_dev->parent_spec)
6115 rbd_warn(rbd_dev,
6116 "WARNING: kernel layering is EXPERIMENTAL!");
6117 }
6118
6119 ret = rbd_dev_probe_parent(rbd_dev, depth);
6120 if (ret)
6121 goto err_out_probe;
6122
6123 dout("discovered format %u image, header name is %s\n",
6124 rbd_dev->image_format, rbd_dev->header_oid.name);
6125 return 0;
6126
6127 err_out_probe:
6128 rbd_dev_unprobe(rbd_dev);
6129 err_out_watch:
6130 if (!depth)
6131 rbd_unregister_watch(rbd_dev);
6132 err_out_format:
6133 rbd_dev->image_format = 0;
6134 kfree(rbd_dev->spec->image_id);
6135 rbd_dev->spec->image_id = NULL;
6136 return ret;
6137 }
6138
6139 static ssize_t do_rbd_add(struct bus_type *bus,
6140 const char *buf,
6141 size_t count)
6142 {
6143 struct rbd_device *rbd_dev = NULL;
6144 struct ceph_options *ceph_opts = NULL;
6145 struct rbd_options *rbd_opts = NULL;
6146 struct rbd_spec *spec = NULL;
6147 struct rbd_client *rbdc;
6148 bool read_only;
6149 int rc;
6150
6151 if (!try_module_get(THIS_MODULE))
6152 return -ENODEV;
6153
6154 /* parse add command */
6155 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6156 if (rc < 0)
6157 goto out;
6158
6159 rbdc = rbd_get_client(ceph_opts);
6160 if (IS_ERR(rbdc)) {
6161 rc = PTR_ERR(rbdc);
6162 goto err_out_args;
6163 }
6164
6165 /* pick the pool */
6166 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6167 if (rc < 0) {
6168 if (rc == -ENOENT)
6169 pr_info("pool %s does not exist\n", spec->pool_name);
6170 goto err_out_client;
6171 }
6172 spec->pool_id = (u64)rc;
6173
6174 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6175 if (!rbd_dev) {
6176 rc = -ENOMEM;
6177 goto err_out_client;
6178 }
6179 rbdc = NULL; /* rbd_dev now owns this */
6180 spec = NULL; /* rbd_dev now owns this */
6181 rbd_opts = NULL; /* rbd_dev now owns this */
6182
6183 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6184 if (!rbd_dev->config_info) {
6185 rc = -ENOMEM;
6186 goto err_out_rbd_dev;
6187 }
6188
6189 down_write(&rbd_dev->header_rwsem);
6190 rc = rbd_dev_image_probe(rbd_dev, 0);
6191 if (rc < 0) {
6192 up_write(&rbd_dev->header_rwsem);
6193 goto err_out_rbd_dev;
6194 }
6195
6196 /* If we are mapping a snapshot it must be marked read-only */
6197
6198 read_only = rbd_dev->opts->read_only;
6199 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6200 read_only = true;
6201 rbd_dev->mapping.read_only = read_only;
6202
6203 rc = rbd_dev_device_setup(rbd_dev);
6204 if (rc)
6205 goto err_out_image_probe;
6206
6207 if (rbd_dev->opts->exclusive) {
6208 rc = rbd_add_acquire_lock(rbd_dev);
6209 if (rc)
6210 goto err_out_device_setup;
6211 }
6212
6213 /* Everything's ready. Announce the disk to the world. */
6214
6215 rc = device_add(&rbd_dev->dev);
6216 if (rc)
6217 goto err_out_image_lock;
6218
6219 add_disk(rbd_dev->disk);
6220 /* see rbd_init_disk() */
6221 blk_put_queue(rbd_dev->disk->queue);
6222
6223 spin_lock(&rbd_dev_list_lock);
6224 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6225 spin_unlock(&rbd_dev_list_lock);
6226
6227 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6228 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6229 rbd_dev->header.features);
6230 rc = count;
6231 out:
6232 module_put(THIS_MODULE);
6233 return rc;
6234
6235 err_out_image_lock:
6236 rbd_dev_image_unlock(rbd_dev);
6237 err_out_device_setup:
6238 rbd_dev_device_release(rbd_dev);
6239 err_out_image_probe:
6240 rbd_dev_image_release(rbd_dev);
6241 err_out_rbd_dev:
6242 rbd_dev_destroy(rbd_dev);
6243 err_out_client:
6244 rbd_put_client(rbdc);
6245 err_out_args:
6246 rbd_spec_put(spec);
6247 kfree(rbd_opts);
6248 goto out;
6249 }
6250
6251 static ssize_t rbd_add(struct bus_type *bus,
6252 const char *buf,
6253 size_t count)
6254 {
6255 if (single_major)
6256 return -EINVAL;
6257
6258 return do_rbd_add(bus, buf, count);
6259 }
6260
6261 static ssize_t rbd_add_single_major(struct bus_type *bus,
6262 const char *buf,
6263 size_t count)
6264 {
6265 return do_rbd_add(bus, buf, count);
6266 }
6267
6268 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6269 {
6270 while (rbd_dev->parent) {
6271 struct rbd_device *first = rbd_dev;
6272 struct rbd_device *second = first->parent;
6273 struct rbd_device *third;
6274
6275 /*
6276 * Follow to the parent with no grandparent and
6277 * remove it.
6278 */
6279 while (second && (third = second->parent)) {
6280 first = second;
6281 second = third;
6282 }
6283 rbd_assert(second);
6284 rbd_dev_image_release(second);
6285 rbd_dev_destroy(second);
6286 first->parent = NULL;
6287 first->parent_overlap = 0;
6288
6289 rbd_assert(first->parent_spec);
6290 rbd_spec_put(first->parent_spec);
6291 first->parent_spec = NULL;
6292 }
6293 }
6294
6295 static ssize_t do_rbd_remove(struct bus_type *bus,
6296 const char *buf,
6297 size_t count)
6298 {
6299 struct rbd_device *rbd_dev = NULL;
6300 struct list_head *tmp;
6301 int dev_id;
6302 char opt_buf[6];
6303 bool already = false;
6304 bool force = false;
6305 int ret;
6306
6307 dev_id = -1;
6308 opt_buf[0] = '\0';
6309 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6310 if (dev_id < 0) {
6311 pr_err("dev_id out of range\n");
6312 return -EINVAL;
6313 }
6314 if (opt_buf[0] != '\0') {
6315 if (!strcmp(opt_buf, "force")) {
6316 force = true;
6317 } else {
6318 pr_err("bad remove option at '%s'\n", opt_buf);
6319 return -EINVAL;
6320 }
6321 }
6322
6323 ret = -ENOENT;
6324 spin_lock(&rbd_dev_list_lock);
6325 list_for_each(tmp, &rbd_dev_list) {
6326 rbd_dev = list_entry(tmp, struct rbd_device, node);
6327 if (rbd_dev->dev_id == dev_id) {
6328 ret = 0;
6329 break;
6330 }
6331 }
6332 if (!ret) {
6333 spin_lock_irq(&rbd_dev->lock);
6334 if (rbd_dev->open_count && !force)
6335 ret = -EBUSY;
6336 else
6337 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6338 &rbd_dev->flags);
6339 spin_unlock_irq(&rbd_dev->lock);
6340 }
6341 spin_unlock(&rbd_dev_list_lock);
6342 if (ret < 0 || already)
6343 return ret;
6344
6345 if (force) {
6346 /*
6347 * Prevent new IO from being queued and wait for existing
6348 * IO to complete/fail.
6349 */
6350 blk_mq_freeze_queue(rbd_dev->disk->queue);
6351 blk_set_queue_dying(rbd_dev->disk->queue);
6352 }
6353
6354 del_gendisk(rbd_dev->disk);
6355 spin_lock(&rbd_dev_list_lock);
6356 list_del_init(&rbd_dev->node);
6357 spin_unlock(&rbd_dev_list_lock);
6358 device_del(&rbd_dev->dev);
6359
6360 rbd_dev_image_unlock(rbd_dev);
6361 rbd_dev_device_release(rbd_dev);
6362 rbd_dev_image_release(rbd_dev);
6363 rbd_dev_destroy(rbd_dev);
6364 return count;
6365 }
6366
6367 static ssize_t rbd_remove(struct bus_type *bus,
6368 const char *buf,
6369 size_t count)
6370 {
6371 if (single_major)
6372 return -EINVAL;
6373
6374 return do_rbd_remove(bus, buf, count);
6375 }
6376
6377 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6378 const char *buf,
6379 size_t count)
6380 {
6381 return do_rbd_remove(bus, buf, count);
6382 }
6383
6384 /*
6385 * create control files in sysfs
6386 * /sys/bus/rbd/...
6387 */
6388 static int rbd_sysfs_init(void)
6389 {
6390 int ret;
6391
6392 ret = device_register(&rbd_root_dev);
6393 if (ret < 0)
6394 return ret;
6395
6396 ret = bus_register(&rbd_bus_type);
6397 if (ret < 0)
6398 device_unregister(&rbd_root_dev);
6399
6400 return ret;
6401 }
6402
6403 static void rbd_sysfs_cleanup(void)
6404 {
6405 bus_unregister(&rbd_bus_type);
6406 device_unregister(&rbd_root_dev);
6407 }
6408
6409 static int rbd_slab_init(void)
6410 {
6411 rbd_assert(!rbd_img_request_cache);
6412 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6413 if (!rbd_img_request_cache)
6414 return -ENOMEM;
6415
6416 rbd_assert(!rbd_obj_request_cache);
6417 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6418 if (!rbd_obj_request_cache)
6419 goto out_err;
6420
6421 rbd_assert(!rbd_bio_clone);
6422 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6423 if (!rbd_bio_clone)
6424 goto out_err_clone;
6425
6426 return 0;
6427
6428 out_err_clone:
6429 kmem_cache_destroy(rbd_obj_request_cache);
6430 rbd_obj_request_cache = NULL;
6431 out_err:
6432 kmem_cache_destroy(rbd_img_request_cache);
6433 rbd_img_request_cache = NULL;
6434 return -ENOMEM;
6435 }
6436
6437 static void rbd_slab_exit(void)
6438 {
6439 rbd_assert(rbd_obj_request_cache);
6440 kmem_cache_destroy(rbd_obj_request_cache);
6441 rbd_obj_request_cache = NULL;
6442
6443 rbd_assert(rbd_img_request_cache);
6444 kmem_cache_destroy(rbd_img_request_cache);
6445 rbd_img_request_cache = NULL;
6446
6447 rbd_assert(rbd_bio_clone);
6448 bioset_free(rbd_bio_clone);
6449 rbd_bio_clone = NULL;
6450 }
6451
6452 static int __init rbd_init(void)
6453 {
6454 int rc;
6455
6456 if (!libceph_compatible(NULL)) {
6457 rbd_warn(NULL, "libceph incompatibility (quitting)");
6458 return -EINVAL;
6459 }
6460
6461 rc = rbd_slab_init();
6462 if (rc)
6463 return rc;
6464
6465 /*
6466 * The number of active work items is limited by the number of
6467 * rbd devices * queue depth, so leave @max_active at default.
6468 */
6469 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6470 if (!rbd_wq) {
6471 rc = -ENOMEM;
6472 goto err_out_slab;
6473 }
6474
6475 if (single_major) {
6476 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6477 if (rbd_major < 0) {
6478 rc = rbd_major;
6479 goto err_out_wq;
6480 }
6481 }
6482
6483 rc = rbd_sysfs_init();
6484 if (rc)
6485 goto err_out_blkdev;
6486
6487 if (single_major)
6488 pr_info("loaded (major %d)\n", rbd_major);
6489 else
6490 pr_info("loaded\n");
6491
6492 return 0;
6493
6494 err_out_blkdev:
6495 if (single_major)
6496 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6497 err_out_wq:
6498 destroy_workqueue(rbd_wq);
6499 err_out_slab:
6500 rbd_slab_exit();
6501 return rc;
6502 }
6503
6504 static void __exit rbd_exit(void)
6505 {
6506 ida_destroy(&rbd_dev_id_ida);
6507 rbd_sysfs_cleanup();
6508 if (single_major)
6509 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6510 destroy_workqueue(rbd_wq);
6511 rbd_slab_exit();
6512 }
6513
6514 module_init(rbd_init);
6515 module_exit(rbd_exit);
6516
6517 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6518 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6519 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6520 /* following authorship retained from original osdblk.c */
6521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6522
6523 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6524 MODULE_LICENSE("GPL");