]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/rbd/cls_rbd.cc
update sources to v12.2.1
[ceph.git] / ceph / src / cls / rbd / cls_rbd.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /** \file
5 *
6 * This is an OSD class that implements methods for
7 * use with rbd.
8 *
9 * Most of these deal with the rbd header object. Methods prefixed
10 * with old_ deal with the original rbd design, in which clients read
11 * and interpreted the header object directly.
12 *
13 * The new format is meant to be opaque to clients - all their
14 * interactions with non-data objects should go through this
15 * class. The OSD class interface leaves the class to implement its
16 * own argument and payload serialization/deserialization, so for ease
17 * of implementation we use the existing ceph encoding/decoding
18 * methods. Something like json might be preferable, but the rbd
19 * kernel module has to be able to understand format as well. The
20 * datatypes exposed to the clients are strings, unsigned integers,
21 * and vectors of those types. The on-wire format can be found in
22 * src/include/encoding.h.
23 *
24 * The methods for interacting with the new format document their
25 * parameters as the client sees them - it would be silly to mention
26 * in each one that they take an input and an output bufferlist.
27 */
28 #include "include/types.h"
29
30 #include <algorithm>
31 #include <errno.h>
32 #include <sstream>
33
34 #include "common/bit_vector.hpp"
35 #include "common/errno.h"
36 #include "objclass/objclass.h"
37 #include "osd/osd_types.h"
38 #include "include/rbd_types.h"
39 #include "include/rbd/object_map_types.h"
40
41 #include "cls/rbd/cls_rbd.h"
42 #include "cls/rbd/cls_rbd_types.h"
43
44
45 /*
46 * Object keys:
47 *
48 * <partial list>
49 *
50 * stripe_unit: size in bytes of the stripe unit. if not present,
51 * the stripe unit is assumed to match the object size (1 << order).
52 *
53 * stripe_count: number of objects to stripe over before looping back.
54 * if not present or 1, striping is disabled. this is the default.
55 *
56 */
57
58 CLS_VER(2,0)
59 CLS_NAME(rbd)
60
61 #define RBD_MAX_KEYS_READ 64
62 #define RBD_SNAP_KEY_PREFIX "snapshot_"
63 #define RBD_DIR_ID_KEY_PREFIX "id_"
64 #define RBD_DIR_NAME_KEY_PREFIX "name_"
65 #define RBD_METADATA_KEY_PREFIX "metadata_"
66
67 #define GROUP_SNAP_SEQ "snap_seq"
68
69 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
70 {
71 unsigned snap_count = 0;
72 uint64_t snap_names_len = 0;
73 struct rbd_obj_header_ondisk *header;
74
75 CLS_LOG(20, "snapshots_list");
76
77 while (1) {
78 int len = sizeof(*header) +
79 snap_count * sizeof(struct rbd_obj_snap_ondisk) +
80 snap_names_len;
81
82 int rc = cls_cxx_read(hctx, 0, len, &bl);
83 if (rc < 0)
84 return rc;
85
86 if (bl.length() < sizeof(*header))
87 return -EINVAL;
88
89 header = (struct rbd_obj_header_ondisk *)bl.c_str();
90 assert(header);
91
92 if ((snap_count != header->snap_count) ||
93 (snap_names_len != header->snap_names_len)) {
94 snap_count = header->snap_count;
95 snap_names_len = header->snap_names_len;
96 bl.clear();
97 continue;
98 }
99 break;
100 }
101
102 return 0;
103 }
104
105 static void key_from_snap_id(snapid_t snap_id, string *out)
106 {
107 ostringstream oss;
108 oss << RBD_SNAP_KEY_PREFIX
109 << std::setw(16) << std::setfill('0') << std::hex << snap_id;
110 *out = oss.str();
111 }
112
113 static snapid_t snap_id_from_key(const string &key)
114 {
115 istringstream iss(key);
116 uint64_t id;
117 iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
118 return id;
119 }
120
121 template<typename T>
122 static int read_key(cls_method_context_t hctx, const string &key, T *out)
123 {
124 bufferlist bl;
125 int r = cls_cxx_map_get_val(hctx, key, &bl);
126 if (r < 0) {
127 if (r != -ENOENT) {
128 CLS_ERR("error reading omap key %s: %s", key.c_str(), cpp_strerror(r).c_str());
129 }
130 return r;
131 }
132
133 try {
134 bufferlist::iterator it = bl.begin();
135 ::decode(*out, it);
136 } catch (const buffer::error &err) {
137 CLS_ERR("error decoding %s", key.c_str());
138 return -EIO;
139 }
140
141 return 0;
142 }
143
144 static int remove_key(cls_method_context_t hctx, const string &key) {
145 int r = cls_cxx_map_remove_key(hctx, key);
146 if (r < 0 && r != -ENOENT) {
147 CLS_ERR("failed to remove key: %s", key.c_str());
148 return r;
149 }
150 return 0;
151 }
152
153 static bool is_valid_id(const string &id) {
154 if (!id.size())
155 return false;
156 for (size_t i = 0; i < id.size(); ++i) {
157 if (!isalnum(id[i])) {
158 return false;
159 }
160 }
161 return true;
162 }
163
164 /**
165 * Initialize the header with basic metadata.
166 * Extra features may initialize more fields in the future.
167 * Everything is stored as key/value pairs as omaps in the header object.
168 *
169 * If features the OSD does not understand are requested, -ENOSYS is
170 * returned.
171 *
172 * Input:
173 * @param size number of bytes in the image (uint64_t)
174 * @param order bits to shift to determine the size of data objects (uint8_t)
175 * @param features what optional things this image will use (uint64_t)
176 * @param object_prefix a prefix for all the data objects
177 * @param data_pool_id pool id where data objects is stored (int64_t)
178 *
179 * Output:
180 * @return 0 on success, negative error code on failure
181 */
182 int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
183 {
184 string object_prefix;
185 uint64_t features, size;
186 uint8_t order;
187 int64_t data_pool_id = -1;
188
189 try {
190 bufferlist::iterator iter = in->begin();
191 ::decode(size, iter);
192 ::decode(order, iter);
193 ::decode(features, iter);
194 ::decode(object_prefix, iter);
195 if (!iter.end()) {
196 ::decode(data_pool_id, iter);
197 }
198 } catch (const buffer::error &err) {
199 return -EINVAL;
200 }
201
202 CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
203 object_prefix.c_str(), (unsigned long long)size, order,
204 (unsigned long long)features);
205
206 if (features & ~RBD_FEATURES_ALL) {
207 return -ENOSYS;
208 }
209
210 if (!object_prefix.size()) {
211 return -EINVAL;
212 }
213
214 bufferlist stored_prefixbl;
215 int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
216 if (r != -ENOENT) {
217 CLS_ERR("reading object_prefix returned %d", r);
218 return -EEXIST;
219 }
220
221 bufferlist sizebl;
222 bufferlist orderbl;
223 bufferlist featuresbl;
224 bufferlist object_prefixbl;
225 bufferlist snap_seqbl;
226 bufferlist create_timestampbl;
227 uint64_t snap_seq = 0;
228 utime_t create_timestamp = ceph_clock_now();
229 ::encode(size, sizebl);
230 ::encode(order, orderbl);
231 ::encode(features, featuresbl);
232 ::encode(object_prefix, object_prefixbl);
233 ::encode(snap_seq, snap_seqbl);
234 ::encode(create_timestamp, create_timestampbl);
235
236 map<string, bufferlist> omap_vals;
237 omap_vals["size"] = sizebl;
238 omap_vals["order"] = orderbl;
239 omap_vals["features"] = featuresbl;
240 omap_vals["object_prefix"] = object_prefixbl;
241 omap_vals["snap_seq"] = snap_seqbl;
242 omap_vals["create_timestamp"] = create_timestampbl;
243
244 if (features & RBD_FEATURE_DATA_POOL) {
245 if (data_pool_id == -1) {
246 CLS_ERR("data pool not provided with feature enabled");
247 return -EINVAL;
248 }
249
250 bufferlist data_pool_id_bl;
251 ::encode(data_pool_id, data_pool_id_bl);
252 omap_vals["data_pool_id"] = data_pool_id_bl;
253 } else if (data_pool_id != -1) {
254 CLS_ERR("data pool provided with feature disabled");
255 return -EINVAL;
256 }
257
258 r = cls_cxx_map_set_vals(hctx, &omap_vals);
259 if (r < 0)
260 return r;
261
262 return 0;
263 }
264
265 /**
266 * Input:
267 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t) (deprecated)
268 * @param read_only true if the image will be used read-only (bool)
269 *
270 * Output:
271 * @param features list of enabled features for the given snapshot (uint64_t)
272 * @param incompatible incompatible feature bits
273 * @returns 0 on success, negative error code on failure
274 */
275 int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
276 {
277 uint64_t snap_id;
278 bool read_only = false;
279
280 bufferlist::iterator iter = in->begin();
281 try {
282 ::decode(snap_id, iter);
283 if (!iter.end()) {
284 ::decode(read_only, iter);
285 }
286 } catch (const buffer::error &err) {
287 return -EINVAL;
288 }
289
290 CLS_LOG(20, "get_features snap_id=%" PRIu64 ", read_only=%d",
291 snap_id, read_only);
292
293 // NOTE: keep this deprecated snapshot logic to support negative
294 // test cases in older (pre-Infernalis) releases. Remove once older
295 // releases are no longer supported.
296 if (snap_id != CEPH_NOSNAP) {
297 cls_rbd_snap snap;
298 string snapshot_key;
299 key_from_snap_id(snap_id, &snapshot_key);
300 int r = read_key(hctx, snapshot_key, &snap);
301 if (r < 0) {
302 return r;
303 }
304 }
305
306 uint64_t features;
307 int r = read_key(hctx, "features", &features);
308 if (r < 0) {
309 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
310 return r;
311 }
312
313 uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
314 features & RBD_FEATURES_RW_INCOMPATIBLE);
315 ::encode(features, *out);
316 ::encode(incompatible, *out);
317 return 0;
318 }
319
320 /**
321 * set the image features
322 *
323 * Input:
324 * @param features image features
325 * @param mask image feature mask
326 *
327 * Output:
328 * none
329 *
330 * @returns 0 on success, negative error code upon failure
331 */
332 int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
333 {
334 uint64_t features;
335 uint64_t mask;
336 bufferlist::iterator iter = in->begin();
337 try {
338 ::decode(features, iter);
339 ::decode(mask, iter);
340 } catch (const buffer::error &err) {
341 return -EINVAL;
342 }
343
344 // check that features exists to make sure this is a header object
345 // that was created correctly
346 uint64_t orig_features = 0;
347 int r = read_key(hctx, "features", &orig_features);
348 if (r < 0 && r != -ENOENT) {
349 CLS_ERR("Could not read image's features off disk: %s",
350 cpp_strerror(r).c_str());
351 return r;
352 }
353
354 // newer clients might attempt to mask off features we don't support
355 mask &= RBD_FEATURES_ALL;
356
357 uint64_t enabled_features = features & mask;
358 if ((enabled_features & RBD_FEATURES_MUTABLE) != enabled_features) {
359 CLS_ERR("Attempting to enable immutable feature: %" PRIu64,
360 static_cast<uint64_t>(enabled_features & ~RBD_FEATURES_MUTABLE));
361 return -EINVAL;
362 }
363
364 uint64_t disabled_features = ~features & mask;
365 uint64_t disable_mask = (RBD_FEATURES_MUTABLE | RBD_FEATURES_DISABLE_ONLY);
366 if ((disabled_features & disable_mask) != disabled_features) {
367 CLS_ERR("Attempting to disable immutable feature: %" PRIu64,
368 enabled_features & ~disable_mask);
369 return -EINVAL;
370 }
371
372 features = (orig_features & ~mask) | (features & mask);
373 CLS_LOG(10, "set_features features=%" PRIu64 " orig_features=%" PRIu64,
374 features, orig_features);
375
376 bufferlist bl;
377 ::encode(features, bl);
378 r = cls_cxx_map_set_val(hctx, "features", &bl);
379 if (r < 0) {
380 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
381 return r;
382 }
383 return 0;
384 }
385
386 /**
387 * check that given feature(s) are set
388 *
389 * @param hctx context
390 * @param need features needed
391 * @return 0 if features are set, negative error (like ENOEXEC) otherwise
392 */
393 int require_feature(cls_method_context_t hctx, uint64_t need)
394 {
395 uint64_t features;
396 int r = read_key(hctx, "features", &features);
397 if (r == -ENOENT) // this implies it's an old-style image with no features
398 return -ENOEXEC;
399 if (r < 0)
400 return r;
401 if ((features & need) != need) {
402 CLS_LOG(10, "require_feature missing feature %llx, have %llx",
403 (unsigned long long)need, (unsigned long long)features);
404 return -ENOEXEC;
405 }
406 return 0;
407 }
408
409 /**
410 * Input:
411 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
412 *
413 * Output:
414 * @param order bits to shift to get the size of data objects (uint8_t)
415 * @param size size of the image in bytes for the given snapshot (uint64_t)
416 * @returns 0 on success, negative error code on failure
417 */
418 int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
419 {
420 uint64_t snap_id, size;
421 uint8_t order;
422
423 bufferlist::iterator iter = in->begin();
424 try {
425 ::decode(snap_id, iter);
426 } catch (const buffer::error &err) {
427 return -EINVAL;
428 }
429
430 CLS_LOG(20, "get_size snap_id=%llu", (unsigned long long)snap_id);
431
432 int r = read_key(hctx, "order", &order);
433 if (r < 0) {
434 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
435 return r;
436 }
437
438 if (snap_id == CEPH_NOSNAP) {
439 r = read_key(hctx, "size", &size);
440 if (r < 0) {
441 CLS_ERR("failed to read the image's size off of disk: %s", cpp_strerror(r).c_str());
442 return r;
443 }
444 } else {
445 cls_rbd_snap snap;
446 string snapshot_key;
447 key_from_snap_id(snap_id, &snapshot_key);
448 int r = read_key(hctx, snapshot_key, &snap);
449 if (r < 0)
450 return r;
451
452 size = snap.image_size;
453 }
454
455 ::encode(order, *out);
456 ::encode(size, *out);
457
458 return 0;
459 }
460
461 /**
462 * Input:
463 * @param size new capacity of the image in bytes (uint64_t)
464 *
465 * Output:
466 * @returns 0 on success, negative error code on failure
467 */
468 int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
469 {
470 uint64_t size;
471
472 bufferlist::iterator iter = in->begin();
473 try {
474 ::decode(size, iter);
475 } catch (const buffer::error &err) {
476 return -EINVAL;
477 }
478
479 // check that size exists to make sure this is a header object
480 // that was created correctly
481 uint64_t orig_size;
482 int r = read_key(hctx, "size", &orig_size);
483 if (r < 0) {
484 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
485 return r;
486 }
487
488 CLS_LOG(20, "set_size size=%llu orig_size=%llu", (unsigned long long)size,
489 (unsigned long long)orig_size);
490
491 bufferlist sizebl;
492 ::encode(size, sizebl);
493 r = cls_cxx_map_set_val(hctx, "size", &sizebl);
494 if (r < 0) {
495 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
496 return r;
497 }
498
499 // if we are shrinking, and have a parent, shrink our overlap with
500 // the parent, too.
501 if (size < orig_size) {
502 cls_rbd_parent parent;
503 r = read_key(hctx, "parent", &parent);
504 if (r == -ENOENT)
505 r = 0;
506 if (r < 0)
507 return r;
508 if (parent.exists() && parent.overlap > size) {
509 bufferlist parentbl;
510 parent.overlap = size;
511 ::encode(parent, parentbl);
512 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
513 if (r < 0) {
514 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
515 return r;
516 }
517 }
518 }
519
520 return 0;
521 }
522
523 /**
524 * verify that the header object exists
525 *
526 * @return 0 if the object exists, -ENOENT if it does not, or other error
527 */
528 int check_exists(cls_method_context_t hctx)
529 {
530 uint64_t size;
531 time_t mtime;
532 return cls_cxx_stat(hctx, &size, &mtime);
533 }
534
535 /**
536 * get the current protection status of the specified snapshot
537 *
538 * Input:
539 * @param snap_id (uint64_t) which snapshot to get the status of
540 *
541 * Output:
542 * @param status (uint8_t) one of:
543 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
544 *
545 * @returns 0 on success, negative error code on failure
546 * @returns -EINVAL if snapid is CEPH_NOSNAP
547 */
548 int get_protection_status(cls_method_context_t hctx, bufferlist *in,
549 bufferlist *out)
550 {
551 snapid_t snap_id;
552
553 bufferlist::iterator iter = in->begin();
554 try {
555 ::decode(snap_id, iter);
556 } catch (const buffer::error &err) {
557 CLS_LOG(20, "get_protection_status: invalid decode");
558 return -EINVAL;
559 }
560
561 int r = check_exists(hctx);
562 if (r < 0)
563 return r;
564
565 CLS_LOG(20, "get_protection_status snap_id=%llu",
566 (unsigned long long)snap_id.val);
567
568 if (snap_id == CEPH_NOSNAP)
569 return -EINVAL;
570
571 cls_rbd_snap snap;
572 string snapshot_key;
573 key_from_snap_id(snap_id.val, &snapshot_key);
574 r = read_key(hctx, snapshot_key, &snap);
575 if (r < 0) {
576 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
577 return r;
578 }
579
580 if (snap.protection_status >= RBD_PROTECTION_STATUS_LAST) {
581 CLS_ERR("invalid protection status for snap id %llu: %u",
582 (unsigned long long)snap_id.val, snap.protection_status);
583 return -EIO;
584 }
585
586 ::encode(snap.protection_status, *out);
587 return 0;
588 }
589
590 /**
591 * set the proctection status of a snapshot
592 *
593 * Input:
594 * @param snapid (uint64_t) which snapshot to set the status of
595 * @param status (uint8_t) one of:
596 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
597 *
598 * @returns 0 on success, negative error code on failure
599 * @returns -EINVAL if snapid is CEPH_NOSNAP
600 */
601 int set_protection_status(cls_method_context_t hctx, bufferlist *in,
602 bufferlist *out)
603 {
604 snapid_t snap_id;
605 uint8_t status;
606
607 bufferlist::iterator iter = in->begin();
608 try {
609 ::decode(snap_id, iter);
610 ::decode(status, iter);
611 } catch (const buffer::error &err) {
612 CLS_LOG(20, "set_protection_status: invalid decode");
613 return -EINVAL;
614 }
615
616 int r = check_exists(hctx);
617 if (r < 0)
618 return r;
619
620 r = require_feature(hctx, RBD_FEATURE_LAYERING);
621 if (r < 0) {
622 CLS_LOG(20, "image does not support layering");
623 return r;
624 }
625
626 CLS_LOG(20, "set_protection_status snapid=%llu status=%u",
627 (unsigned long long)snap_id.val, status);
628
629 if (snap_id == CEPH_NOSNAP)
630 return -EINVAL;
631
632 if (status >= RBD_PROTECTION_STATUS_LAST) {
633 CLS_LOG(10, "invalid protection status for snap id %llu: %u",
634 (unsigned long long)snap_id.val, status);
635 return -EINVAL;
636 }
637
638 cls_rbd_snap snap;
639 string snapshot_key;
640 key_from_snap_id(snap_id.val, &snapshot_key);
641 r = read_key(hctx, snapshot_key, &snap);
642 if (r < 0) {
643 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
644 return r;
645 }
646
647 snap.protection_status = status;
648 bufferlist snapshot_bl;
649 ::encode(snap, snapshot_bl);
650 r = cls_cxx_map_set_val(hctx, snapshot_key, &snapshot_bl);
651 if (r < 0) {
652 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
653 return r;
654 }
655
656 return 0;
657 }
658
659 /**
660 * get striping parameters
661 *
662 * Input:
663 * none
664 *
665 * Output:
666 * @param stripe unit (bytes)
667 * @param stripe count (num objects)
668 *
669 * @returns 0 on success
670 */
671 int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
672 {
673 int r = check_exists(hctx);
674 if (r < 0)
675 return r;
676
677 CLS_LOG(20, "get_stripe_unit_count");
678
679 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
680 if (r < 0)
681 return r;
682
683 uint64_t stripe_unit = 0, stripe_count = 0;
684 r = read_key(hctx, "stripe_unit", &stripe_unit);
685 if (r == -ENOENT) {
686 // default to object size
687 uint8_t order;
688 r = read_key(hctx, "order", &order);
689 if (r < 0) {
690 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
691 return -EIO;
692 }
693 stripe_unit = 1ull << order;
694 }
695 if (r < 0)
696 return r;
697 r = read_key(hctx, "stripe_count", &stripe_count);
698 if (r == -ENOENT) {
699 // default to 1
700 stripe_count = 1;
701 r = 0;
702 }
703 if (r < 0)
704 return r;
705
706 ::encode(stripe_unit, *out);
707 ::encode(stripe_count, *out);
708 return 0;
709 }
710
711 /**
712 * set striping parameters
713 *
714 * Input:
715 * @param stripe unit (bytes)
716 * @param stripe count (num objects)
717 *
718 * @returns 0 on success
719 */
720 int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
721 {
722 uint64_t stripe_unit, stripe_count;
723
724 bufferlist::iterator iter = in->begin();
725 try {
726 ::decode(stripe_unit, iter);
727 ::decode(stripe_count, iter);
728 } catch (const buffer::error &err) {
729 CLS_LOG(20, "set_stripe_unit_count: invalid decode");
730 return -EINVAL;
731 }
732
733 if (!stripe_count || !stripe_unit)
734 return -EINVAL;
735
736 int r = check_exists(hctx);
737 if (r < 0)
738 return r;
739
740 CLS_LOG(20, "set_stripe_unit_count");
741
742 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
743 if (r < 0)
744 return r;
745
746 uint8_t order;
747 r = read_key(hctx, "order", &order);
748 if (r < 0) {
749 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
750 return r;
751 }
752 if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
753 CLS_ERR("stripe unit %llu is not a factor of the object size %llu",
754 (unsigned long long)stripe_unit, 1ull << order);
755 return -EINVAL;
756 }
757
758 bufferlist bl, bl2;
759 ::encode(stripe_unit, bl);
760 r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
761 if (r < 0) {
762 CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
763 return r;
764 }
765
766 ::encode(stripe_count, bl2);
767 r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
768 if (r < 0) {
769 CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
770 return r;
771 }
772
773 return 0;
774 }
775
776 int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
777 {
778 CLS_LOG(20, "get_create_timestamp");
779
780 utime_t timestamp;
781 bufferlist bl;
782 int r = cls_cxx_map_get_val(hctx, "create_timestamp", &bl);
783 if (r < 0) {
784 if (r != -ENOENT) {
785 CLS_ERR("error reading create_timestamp: %s", cpp_strerror(r).c_str());
786 return r;
787 }
788 } else {
789 try {
790 bufferlist::iterator it = bl.begin();
791 ::decode(timestamp, it);
792 } catch (const buffer::error &err) {
793 CLS_ERR("could not decode create_timestamp");
794 return -EIO;
795 }
796 }
797
798 ::encode(timestamp, *out);
799 return 0;
800 }
801
802 /**
803 * get the image flags
804 *
805 * Input:
806 * @param snap_id which snapshot to query, to CEPH_NOSNAP (uint64_t)
807 *
808 * Output:
809 * @param flags image flags
810 *
811 * @returns 0 on success, negative error code upon failure
812 */
813 int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
814 {
815 uint64_t snap_id;
816 bufferlist::iterator iter = in->begin();
817 try {
818 ::decode(snap_id, iter);
819 } catch (const buffer::error &err) {
820 return -EINVAL;
821 }
822
823 CLS_LOG(20, "get_flags snap_id=%llu", (unsigned long long)snap_id);
824
825 uint64_t flags = 0;
826 if (snap_id == CEPH_NOSNAP) {
827 int r = read_key(hctx, "flags", &flags);
828 if (r < 0 && r != -ENOENT) {
829 CLS_ERR("failed to read flags off disk: %s", cpp_strerror(r).c_str());
830 return r;
831 }
832 } else {
833 cls_rbd_snap snap;
834 string snapshot_key;
835 key_from_snap_id(snap_id, &snapshot_key);
836 int r = read_key(hctx, snapshot_key, &snap);
837 if (r < 0) {
838 return r;
839 }
840 flags = snap.flags;
841 }
842
843 ::encode(flags, *out);
844 return 0;
845 }
846
847 /**
848 * set the image flags
849 *
850 * Input:
851 * @param flags image flags
852 * @param mask image flag mask
853 * @param snap_id which snapshot to update, or CEPH_NOSNAP (uint64_t)
854 *
855 * Output:
856 * none
857 *
858 * @returns 0 on success, negative error code upon failure
859 */
860 int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
861 {
862 uint64_t flags;
863 uint64_t mask;
864 uint64_t snap_id = CEPH_NOSNAP;
865 bufferlist::iterator iter = in->begin();
866 try {
867 ::decode(flags, iter);
868 ::decode(mask, iter);
869 if (!iter.end()) {
870 ::decode(snap_id, iter);
871 }
872 } catch (const buffer::error &err) {
873 return -EINVAL;
874 }
875
876 // check that size exists to make sure this is a header object
877 // that was created correctly
878 int r;
879 uint64_t orig_flags = 0;
880 cls_rbd_snap snap_meta;
881 string snap_meta_key;
882 if (snap_id == CEPH_NOSNAP) {
883 r = read_key(hctx, "flags", &orig_flags);
884 if (r < 0 && r != -ENOENT) {
885 CLS_ERR("Could not read image's flags off disk: %s",
886 cpp_strerror(r).c_str());
887 return r;
888 }
889 } else {
890 key_from_snap_id(snap_id, &snap_meta_key);
891 r = read_key(hctx, snap_meta_key, &snap_meta);
892 if (r < 0) {
893 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
894 snap_id, cpp_strerror(r).c_str());
895 return r;
896 }
897 orig_flags = snap_meta.flags;
898 }
899
900 flags = (orig_flags & ~mask) | (flags & mask);
901 CLS_LOG(20, "set_flags snap_id=%" PRIu64 ", orig_flags=%" PRIu64 ", "
902 "new_flags=%" PRIu64 ", mask=%" PRIu64, snap_id, orig_flags,
903 flags, mask);
904
905 if (snap_id == CEPH_NOSNAP) {
906 bufferlist bl;
907 ::encode(flags, bl);
908 r = cls_cxx_map_set_val(hctx, "flags", &bl);
909 } else {
910 snap_meta.flags = flags;
911
912 bufferlist bl;
913 ::encode(snap_meta, bl);
914 r = cls_cxx_map_set_val(hctx, snap_meta_key, &bl);
915 }
916
917 if (r < 0) {
918 CLS_ERR("error updating flags: %s", cpp_strerror(r).c_str());
919 return r;
920 }
921 return 0;
922 }
923
924 /**
925 * get the current parent, if any
926 *
927 * Input:
928 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
929 *
930 * Output:
931 * @param pool parent pool id (-1 if parent does not exist)
932 * @param image parent image id
933 * @param snapid parent snapid
934 * @param size portion of parent mapped under the child
935 *
936 * @returns 0 on success or parent does not exist, negative error code on failure
937 */
938 int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
939 {
940 uint64_t snap_id;
941
942 bufferlist::iterator iter = in->begin();
943 try {
944 ::decode(snap_id, iter);
945 } catch (const buffer::error &err) {
946 return -EINVAL;
947 }
948
949 int r = check_exists(hctx);
950 if (r < 0)
951 return r;
952
953 CLS_LOG(20, "get_parent snap_id=%llu", (unsigned long long)snap_id);
954
955 cls_rbd_parent parent;
956 r = require_feature(hctx, RBD_FEATURE_LAYERING);
957 if (r == 0) {
958 if (snap_id == CEPH_NOSNAP) {
959 r = read_key(hctx, "parent", &parent);
960 if (r < 0 && r != -ENOENT)
961 return r;
962 } else {
963 cls_rbd_snap snap;
964 string snapshot_key;
965 key_from_snap_id(snap_id, &snapshot_key);
966 r = read_key(hctx, snapshot_key, &snap);
967 if (r < 0 && r != -ENOENT)
968 return r;
969 parent = snap.parent;
970 }
971 }
972
973 ::encode(parent.pool, *out);
974 ::encode(parent.id, *out);
975 ::encode(parent.snapid, *out);
976 ::encode(parent.overlap, *out);
977 return 0;
978 }
979
980 /**
981 * set the image parent
982 *
983 * Input:
984 * @param pool parent pool
985 * @param id parent image id
986 * @param snapid parent snapid
987 * @param size parent size
988 *
989 * @returns 0 on success, or negative error code
990 */
991 int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
992 {
993 int64_t pool;
994 string id;
995 snapid_t snapid;
996 uint64_t size;
997
998 bufferlist::iterator iter = in->begin();
999 try {
1000 ::decode(pool, iter);
1001 ::decode(id, iter);
1002 ::decode(snapid, iter);
1003 ::decode(size, iter);
1004 } catch (const buffer::error &err) {
1005 CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
1006 return -EINVAL;
1007 }
1008
1009 int r = check_exists(hctx);
1010 if (r < 0) {
1011 CLS_LOG(20, "cls_rbd::set_parent: child already exists");
1012 return r;
1013 }
1014
1015 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1016 if (r < 0) {
1017 CLS_LOG(20, "cls_rbd::set_parent: child does not support layering");
1018 return r;
1019 }
1020
1021 CLS_LOG(20, "set_parent pool=%llu id=%s snapid=%llu size=%llu",
1022 (unsigned long long)pool, id.c_str(), (unsigned long long)snapid.val,
1023 (unsigned long long)size);
1024
1025 if (pool < 0 || id.length() == 0 || snapid == CEPH_NOSNAP || size == 0) {
1026 return -EINVAL;
1027 }
1028
1029 // make sure there isn't already a parent
1030 cls_rbd_parent parent;
1031 r = read_key(hctx, "parent", &parent);
1032 if (r == 0) {
1033 CLS_LOG(20, "set_parent existing parent pool=%llu id=%s snapid=%llu"
1034 "overlap=%llu", (unsigned long long)parent.pool, parent.id.c_str(),
1035 (unsigned long long)parent.snapid.val,
1036 (unsigned long long)parent.overlap);
1037 return -EEXIST;
1038 }
1039
1040 // our overlap is the min of our size and the parent's size.
1041 uint64_t our_size;
1042 r = read_key(hctx, "size", &our_size);
1043 if (r < 0)
1044 return r;
1045
1046 bufferlist parentbl;
1047 parent.pool = pool;
1048 parent.id = id;
1049 parent.snapid = snapid;
1050 parent.overlap = MIN(our_size, size);
1051 ::encode(parent, parentbl);
1052 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
1053 if (r < 0) {
1054 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
1055 return r;
1056 }
1057
1058 return 0;
1059 }
1060
1061
1062 /**
1063 * remove the parent pointer
1064 *
1065 * This can only happen on the head, not on a snapshot. No arguments.
1066 *
1067 * @returns 0 on success, negative error code on failure.
1068 */
1069 int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1070 {
1071 int r = check_exists(hctx);
1072 if (r < 0)
1073 return r;
1074
1075 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1076 if (r < 0)
1077 return r;
1078
1079 uint64_t features;
1080 r = read_key(hctx, "features", &features);
1081 if (r < 0) {
1082 return r;
1083 }
1084
1085 // remove the parent from all snapshots
1086 if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0) {
1087 int max_read = RBD_MAX_KEYS_READ;
1088 vector<snapid_t> snap_ids;
1089 string last_read = RBD_SNAP_KEY_PREFIX;
1090 bool more;
1091
1092 do {
1093 set<string> keys;
1094 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1095 if (r < 0) {
1096 return r;
1097 }
1098
1099 for (std::set<string>::const_iterator it = keys.begin();
1100 it != keys.end(); ++it) {
1101 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0) {
1102 break;
1103 }
1104
1105 uint64_t snap_id = snap_id_from_key(*it);
1106 cls_rbd_snap snap_meta;
1107 r = read_key(hctx, *it, &snap_meta);
1108 if (r < 0) {
1109 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
1110 snap_id, cpp_strerror(r).c_str());
1111 return r;
1112 }
1113
1114 snap_meta.parent = cls_rbd_parent();
1115
1116 bufferlist bl;
1117 ::encode(snap_meta, bl);
1118 r = cls_cxx_map_set_val(hctx, *it, &bl);
1119 if (r < 0) {
1120 CLS_ERR("Could not update snapshot: snap_id=%" PRIu64 ": %s",
1121 snap_id, cpp_strerror(r).c_str());
1122 return r;
1123 }
1124 }
1125
1126 if (!keys.empty()) {
1127 last_read = *(keys.rbegin());
1128 }
1129 } while (more);
1130 }
1131
1132 cls_rbd_parent parent;
1133 r = read_key(hctx, "parent", &parent);
1134 if (r < 0)
1135 return r;
1136
1137 r = cls_cxx_map_remove_key(hctx, "parent");
1138 if (r < 0) {
1139 CLS_ERR("error removing parent: %s", cpp_strerror(r).c_str());
1140 return r;
1141 }
1142 return 0;
1143 }
1144
1145 /**
1146 * methods for dealing with rbd_children object
1147 */
1148
1149 static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
1150 string *image_id, snapid_t *snap_id)
1151 {
1152 try {
1153 ::decode(*pool_id, it);
1154 ::decode(*image_id, it);
1155 ::decode(*snap_id, it);
1156 } catch (const buffer::error &err) {
1157 CLS_ERR("error decoding parent spec");
1158 return -EINVAL;
1159 }
1160 return 0;
1161 }
1162
1163 static int decode_parent(bufferlist *in, uint64_t *pool_id,
1164 string *image_id, snapid_t *snap_id)
1165 {
1166 bufferlist::iterator it = in->begin();
1167 return decode_parent_common(it, pool_id, image_id, snap_id);
1168 }
1169
1170 static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
1171 string *image_id, snapid_t *snap_id,
1172 string *c_image_id)
1173 {
1174 bufferlist::iterator it = in->begin();
1175 int r = decode_parent_common(it, pool_id, image_id, snap_id);
1176 if (r < 0)
1177 return r;
1178 try {
1179 ::decode(*c_image_id, it);
1180 } catch (const buffer::error &err) {
1181 CLS_ERR("error decoding child image id");
1182 return -EINVAL;
1183 }
1184 return 0;
1185 }
1186
1187 static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
1188 {
1189 bufferlist key_bl;
1190 ::encode(pool_id, key_bl);
1191 ::encode(image_id, key_bl);
1192 ::encode(snap_id, key_bl);
1193 return string(key_bl.c_str(), key_bl.length());
1194 }
1195
1196 /**
1197 * add child to rbd_children directory object
1198 *
1199 * rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
1200 * [c_image_id, [c_image_id ... ]]
1201 *
1202 * Input:
1203 * @param p_pool_id parent pool id
1204 * @param p_image_id parent image oid
1205 * @param p_snap_id parent snapshot id
1206 * @param c_image_id new child image oid to add
1207 *
1208 * @returns 0 on success, negative error on failure
1209 */
1210
1211 int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1212 {
1213 int r;
1214
1215 uint64_t p_pool_id;
1216 snapid_t p_snap_id;
1217 string p_image_id, c_image_id;
1218 // Use set for ease of erase() for remove_child()
1219 std::set<string> children;
1220
1221 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1222 &c_image_id);
1223 if (r < 0)
1224 return r;
1225
1226 CLS_LOG(20, "add_child %s to (%" PRIu64 ", %s, %" PRIu64 ")", c_image_id.c_str(),
1227 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1228
1229 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1230
1231 // get current child list for parent, if any
1232 r = read_key(hctx, key, &children);
1233 if ((r < 0) && (r != -ENOENT)) {
1234 CLS_LOG(20, "add_child: omap read failed: %s", cpp_strerror(r).c_str());
1235 return r;
1236 }
1237
1238 if (children.find(c_image_id) != children.end()) {
1239 CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
1240 return -EEXIST;
1241 }
1242 // add new child
1243 children.insert(c_image_id);
1244
1245 // write back
1246 bufferlist childbl;
1247 ::encode(children, childbl);
1248 r = cls_cxx_map_set_val(hctx, key, &childbl);
1249 if (r < 0)
1250 CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
1251 return r;
1252 }
1253
1254 /**
1255 * remove child from rbd_children directory object
1256 *
1257 * Input:
1258 * @param p_pool_id parent pool id
1259 * @param p_image_id parent image oid
1260 * @param p_snap_id parent snapshot id
1261 * @param c_image_id new child image oid to add
1262 *
1263 * @returns 0 on success, negative error on failure
1264 */
1265
1266 int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1267 {
1268 int r;
1269
1270 uint64_t p_pool_id;
1271 snapid_t p_snap_id;
1272 string p_image_id, c_image_id;
1273 std::set<string> children;
1274
1275 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1276 &c_image_id);
1277 if (r < 0)
1278 return r;
1279
1280 CLS_LOG(20, "remove_child %s from (%" PRIu64 ", %s, %" PRIu64 ")",
1281 c_image_id.c_str(), p_pool_id, p_image_id.c_str(),
1282 p_snap_id.val);
1283
1284 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1285
1286 // get current child list for parent. Unlike add_child(), an empty list
1287 // is an error (how can we remove something that doesn't exist?)
1288 r = read_key(hctx, key, &children);
1289 if (r < 0) {
1290 CLS_LOG(20, "remove_child: read omap failed: %s", cpp_strerror(r).c_str());
1291 return r;
1292 }
1293
1294 if (children.find(c_image_id) == children.end()) {
1295 CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
1296 return -ENOENT;
1297 }
1298 // find and remove child
1299 children.erase(c_image_id);
1300
1301 // now empty? remove key altogether
1302 if (children.empty()) {
1303 r = cls_cxx_map_remove_key(hctx, key);
1304 if (r < 0)
1305 CLS_LOG(20, "remove_child: remove key failed: %s", cpp_strerror(r).c_str());
1306 } else {
1307 // write back shortened children list
1308 bufferlist childbl;
1309 ::encode(children, childbl);
1310 r = cls_cxx_map_set_val(hctx, key, &childbl);
1311 if (r < 0)
1312 CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
1313 }
1314 return r;
1315 }
1316
1317 /**
1318 * Input:
1319 * @param p_pool_id parent pool id
1320 * @param p_image_id parent image oid
1321 * @param p_snap_id parent snapshot id
1322 * @param c_image_id new child image oid to add
1323 *
1324 * Output:
1325 * @param children set<string> of children
1326 *
1327 * @returns 0 on success, negative error on failure
1328 */
1329 int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1330 {
1331 int r;
1332 uint64_t p_pool_id;
1333 snapid_t p_snap_id;
1334 string p_image_id;
1335 std::set<string> children;
1336
1337 r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
1338 if (r < 0)
1339 return r;
1340
1341 CLS_LOG(20, "get_children of (%" PRIu64 ", %s, %" PRIu64 ")",
1342 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1343
1344 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1345
1346 r = read_key(hctx, key, &children);
1347 if (r < 0) {
1348 if (r != -ENOENT)
1349 CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
1350 return r;
1351 }
1352 ::encode(children, *out);
1353 return 0;
1354 }
1355
1356
1357 /**
1358 * Get the information needed to create a rados snap context for doing
1359 * I/O to the data objects. This must include all snapshots.
1360 *
1361 * Output:
1362 * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
1363 * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
1364 * @returns 0 on success, negative error code on failure
1365 */
1366 int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1367 {
1368 CLS_LOG(20, "get_snapcontext");
1369
1370 int r;
1371 int max_read = RBD_MAX_KEYS_READ;
1372 vector<snapid_t> snap_ids;
1373 string last_read = RBD_SNAP_KEY_PREFIX;
1374 bool more;
1375
1376 do {
1377 set<string> keys;
1378 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1379 if (r < 0)
1380 return r;
1381
1382 for (set<string>::const_iterator it = keys.begin();
1383 it != keys.end(); ++it) {
1384 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0)
1385 break;
1386 snapid_t snap_id = snap_id_from_key(*it);
1387 snap_ids.push_back(snap_id);
1388 }
1389 if (!keys.empty())
1390 last_read = *(keys.rbegin());
1391 } while (more);
1392
1393 uint64_t snap_seq;
1394 r = read_key(hctx, "snap_seq", &snap_seq);
1395 if (r < 0) {
1396 CLS_ERR("could not read the image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1397 return r;
1398 }
1399
1400 // snap_ids must be descending in a snap context
1401 std::reverse(snap_ids.begin(), snap_ids.end());
1402
1403 ::encode(snap_seq, *out);
1404 ::encode(snap_ids, *out);
1405
1406 return 0;
1407 }
1408
1409 /**
1410 * Output:
1411 * @param object_prefix prefix for data object names (string)
1412 * @returns 0 on success, negative error code on failure
1413 */
1414 int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1415 {
1416 CLS_LOG(20, "get_object_prefix");
1417
1418 string object_prefix;
1419 int r = read_key(hctx, "object_prefix", &object_prefix);
1420 if (r < 0) {
1421 CLS_ERR("failed to read the image's object prefix off of disk: %s",
1422 cpp_strerror(r).c_str());
1423 return r;
1424 }
1425
1426 ::encode(object_prefix, *out);
1427
1428 return 0;
1429 }
1430
1431 /**
1432 * Input:
1433 * none
1434 *
1435 * Output:
1436 * @param pool_id (int64_t) of data pool or -1 if none
1437 * @returns 0 on success, negative error code on failure
1438 */
1439 int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1440 {
1441 CLS_LOG(20, "get_data_pool");
1442
1443 int64_t data_pool_id = -1;
1444 int r = read_key(hctx, "data_pool_id", &data_pool_id);
1445 if (r == -ENOENT) {
1446 data_pool_id = -1;
1447 } else if (r < 0) {
1448 CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
1449 return r;
1450 }
1451
1452 ::encode(data_pool_id, *out);
1453 return 0;
1454 }
1455
1456 int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1457 {
1458 uint64_t snap_id;
1459
1460 bufferlist::iterator iter = in->begin();
1461 try {
1462 ::decode(snap_id, iter);
1463 } catch (const buffer::error &err) {
1464 return -EINVAL;
1465 }
1466
1467 CLS_LOG(20, "get_snapshot_name snap_id=%llu", (unsigned long long)snap_id);
1468
1469 if (snap_id == CEPH_NOSNAP)
1470 return -EINVAL;
1471
1472 cls_rbd_snap snap;
1473 string snapshot_key;
1474 key_from_snap_id(snap_id, &snapshot_key);
1475 int r = read_key(hctx, snapshot_key, &snap);
1476 if (r < 0)
1477 return r;
1478
1479 ::encode(snap.name, *out);
1480
1481 return 0;
1482 }
1483
1484 int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1485 {
1486 uint64_t snap_id;
1487
1488 bufferlist::iterator iter = in->begin();
1489 try {
1490 ::decode(snap_id, iter);
1491 } catch (const buffer::error &err) {
1492 return -EINVAL;
1493 }
1494
1495 CLS_LOG(20, "get_snapshot_timestamp snap_id=%llu", (unsigned long long)snap_id);
1496
1497 if (snap_id == CEPH_NOSNAP) {
1498 return -EINVAL;
1499 }
1500
1501 cls_rbd_snap snap;
1502 string snapshot_key;
1503 key_from_snap_id(snap_id, &snapshot_key);
1504 int r = read_key(hctx, snapshot_key, &snap);
1505 if (r < 0) {
1506 return r;
1507 }
1508
1509 ::encode(snap.timestamp, *out);
1510 return 0;
1511 }
1512
1513 /**
1514 * Retrieve namespace of a snapshot.
1515 *
1516 * Input:
1517 * @param snap_id id of the snapshot (uint64_t)
1518 *
1519 * Output:
1520 * @param SnapshotNamespace
1521 * @returns 0 on success, negative error code on failure.
1522 */
1523 int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1524 {
1525 uint64_t snap_id;
1526
1527 bufferlist::iterator iter = in->begin();
1528 try {
1529 ::decode(snap_id, iter);
1530 } catch (const buffer::error &err) {
1531 return -EINVAL;
1532 }
1533
1534 CLS_LOG(20, "get_snapshot_namespace snap_id=%" PRIu64, snap_id);
1535
1536 if (snap_id == CEPH_NOSNAP) {
1537 return -EINVAL;
1538 }
1539
1540 cls_rbd_snap snap;
1541 string snapshot_key;
1542 key_from_snap_id(snap_id, &snapshot_key);
1543 int r = read_key(hctx, snapshot_key, &snap);
1544 if (r < 0) {
1545 return r;
1546 }
1547
1548 ::encode(snap.snapshot_namespace, *out);
1549
1550 return 0;
1551 }
1552
1553 /**
1554 * Adds a snapshot to an rbd header. Ensures the id and name are unique.
1555 *
1556 * Input:
1557 * @param snap_name name of the snapshot (string)
1558 * @param snap_id id of the snapshot (uint64_t)
1559 * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespaceOnDisk)
1560 *
1561 * Output:
1562 * @returns 0 on success, negative error code on failure.
1563 * @returns -ESTALE if the input snap_id is less than the image's snap_seq
1564 * @returns -EEXIST if the id or name are already used by another snapshot
1565 */
1566 int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1567 {
1568 bufferlist snap_namebl, snap_idbl;
1569 cls_rbd_snap snap_meta;
1570 uint64_t snap_limit;
1571
1572 try {
1573 bufferlist::iterator iter = in->begin();
1574 ::decode(snap_meta.name, iter);
1575 ::decode(snap_meta.id, iter);
1576 if (!iter.end()) {
1577 ::decode(snap_meta.snapshot_namespace, iter);
1578 }
1579 } catch (const buffer::error &err) {
1580 return -EINVAL;
1581 }
1582
1583 if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
1584 &snap_meta.snapshot_namespace.snapshot_namespace) != nullptr) {
1585 CLS_ERR("Unknown snapshot namespace provided");
1586 return -EINVAL;
1587 }
1588
1589 CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_meta.name.c_str(),
1590 (unsigned long long)snap_meta.id.val);
1591
1592 if (snap_meta.id > CEPH_MAXSNAP)
1593 return -EINVAL;
1594
1595 uint64_t cur_snap_seq;
1596 int r = read_key(hctx, "snap_seq", &cur_snap_seq);
1597 if (r < 0) {
1598 CLS_ERR("Could not read image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1599 return r;
1600 }
1601
1602 // client lost a race with another snapshot creation.
1603 // snap_seq must be monotonically increasing.
1604 if (snap_meta.id < cur_snap_seq)
1605 return -ESTALE;
1606
1607 r = read_key(hctx, "size", &snap_meta.image_size);
1608 if (r < 0) {
1609 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
1610 return r;
1611 }
1612 r = read_key(hctx, "features", &snap_meta.features);
1613 if (r < 0) {
1614 CLS_ERR("Could not read image's features off disk: %s", cpp_strerror(r).c_str());
1615 return r;
1616 }
1617 r = read_key(hctx, "flags", &snap_meta.flags);
1618 if (r < 0 && r != -ENOENT) {
1619 CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
1620 return r;
1621 }
1622
1623 r = read_key(hctx, "snap_limit", &snap_limit);
1624 if (r == -ENOENT) {
1625 snap_limit = UINT64_MAX;
1626 } else if (r < 0) {
1627 CLS_ERR("Could not read snapshot limit off disk: %s", cpp_strerror(r).c_str());
1628 return r;
1629 }
1630
1631 snap_meta.timestamp = ceph_clock_now();
1632
1633 int max_read = RBD_MAX_KEYS_READ;
1634 uint64_t total_read = 0;
1635 string last_read = RBD_SNAP_KEY_PREFIX;
1636 bool more;
1637 do {
1638 map<string, bufferlist> vals;
1639 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1640 max_read, &vals, &more);
1641 if (r < 0)
1642 return r;
1643
1644 total_read += vals.size();
1645 if (total_read >= snap_limit) {
1646 CLS_ERR("Attempt to create snapshot over limit of %" PRIu64, snap_limit);
1647 return -EDQUOT;
1648 }
1649
1650 for (map<string, bufferlist>::iterator it = vals.begin();
1651 it != vals.end(); ++it) {
1652 cls_rbd_snap old_meta;
1653 bufferlist::iterator iter = it->second.begin();
1654 try {
1655 ::decode(old_meta, iter);
1656 } catch (const buffer::error &err) {
1657 snapid_t snap_id = snap_id_from_key(it->first);
1658 CLS_ERR("error decoding snapshot metadata for snap_id: %llu",
1659 (unsigned long long)snap_id.val);
1660 return -EIO;
1661 }
1662 if ((snap_meta.name == old_meta.name &&
1663 snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
1664 snap_meta.id == old_meta.id) {
1665 CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
1666 snap_meta.name.c_str(), (unsigned long long)snap_meta.id.val,
1667 old_meta.name.c_str(), (unsigned long long)old_meta.id.val);
1668 return -EEXIST;
1669 }
1670 }
1671
1672 if (!vals.empty())
1673 last_read = vals.rbegin()->first;
1674 } while (more);
1675
1676 // snapshot inherits parent, if any
1677 cls_rbd_parent parent;
1678 r = read_key(hctx, "parent", &parent);
1679 if (r < 0 && r != -ENOENT)
1680 return r;
1681 if (r == 0) {
1682 snap_meta.parent = parent;
1683 }
1684
1685 bufferlist snap_metabl, snap_seqbl;
1686 ::encode(snap_meta, snap_metabl);
1687 ::encode(snap_meta.id, snap_seqbl);
1688
1689 string snapshot_key;
1690 key_from_snap_id(snap_meta.id, &snapshot_key);
1691 map<string, bufferlist> vals;
1692 vals["snap_seq"] = snap_seqbl;
1693 vals[snapshot_key] = snap_metabl;
1694 r = cls_cxx_map_set_vals(hctx, &vals);
1695 if (r < 0) {
1696 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1697 return r;
1698 }
1699
1700 return 0;
1701 }
1702
1703
1704 /**
1705 * rename snapshot .
1706 *
1707 * Input:
1708 * @param src_snap_id old snap id of the snapshot (snapid_t)
1709 * @param dst_snap_name new name of the snapshot (string)
1710 *
1711 * Output:
1712 * @returns 0 on success, negative error code on failure.
1713 */
1714 int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1715 {
1716 bufferlist snap_namebl, snap_idbl;
1717 snapid_t src_snap_id;
1718 string src_snap_key,dst_snap_name;
1719 cls_rbd_snap snap_meta;
1720 int r;
1721
1722 try {
1723 bufferlist::iterator iter = in->begin();
1724 ::decode(src_snap_id, iter);
1725 ::decode(dst_snap_name, iter);
1726 } catch (const buffer::error &err) {
1727 return -EINVAL;
1728 }
1729
1730 CLS_LOG(20, "snapshot_rename id=%llu dst_name=%s", (unsigned long long)src_snap_id.val,
1731 dst_snap_name.c_str());
1732
1733 int max_read = RBD_MAX_KEYS_READ;
1734 string last_read = RBD_SNAP_KEY_PREFIX;
1735 bool more;
1736 do {
1737 map<string, bufferlist> vals;
1738 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1739 max_read, &vals, &more);
1740 if (r < 0)
1741 return r;
1742
1743 for (map<string, bufferlist>::iterator it = vals.begin();
1744 it != vals.end(); ++it) {
1745 bufferlist::iterator iter = it->second.begin();
1746 try {
1747 ::decode(snap_meta, iter);
1748 } catch (const buffer::error &err) {
1749 CLS_ERR("error decoding snapshot metadata for snap : %s",
1750 dst_snap_name.c_str());
1751 return -EIO;
1752 }
1753 if (dst_snap_name == snap_meta.name) {
1754 CLS_LOG(20, "snap_name %s matches existing snap with snap id = %llu ",
1755 dst_snap_name.c_str(), (unsigned long long)snap_meta.id.val);
1756 return -EEXIST;
1757 }
1758 }
1759 if (!vals.empty())
1760 last_read = vals.rbegin()->first;
1761 } while (more);
1762
1763 key_from_snap_id(src_snap_id, &src_snap_key);
1764 r = read_key(hctx, src_snap_key, &snap_meta);
1765 if (r == -ENOENT) {
1766 CLS_LOG(20, "cannot find existing snap with snap id = %llu ", (unsigned long long)src_snap_id);
1767 return r;
1768 }
1769 snap_meta.name = dst_snap_name;
1770 bufferlist snap_metabl;
1771 ::encode(snap_meta, snap_metabl);
1772
1773 r = cls_cxx_map_set_val(hctx, src_snap_key, &snap_metabl);
1774 if (r < 0) {
1775 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1776 return r;
1777 }
1778
1779 return 0;
1780 }
1781 /**
1782 * Removes a snapshot from an rbd header.
1783 *
1784 * Input:
1785 * @param snap_id the id of the snapshot to remove (uint64_t)
1786 *
1787 * Output:
1788 * @returns 0 on success, negative error code on failure
1789 */
1790 int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1791 {
1792 snapid_t snap_id;
1793
1794 try {
1795 bufferlist::iterator iter = in->begin();
1796 ::decode(snap_id, iter);
1797 } catch (const buffer::error &err) {
1798 return -EINVAL;
1799 }
1800
1801 CLS_LOG(20, "snapshot_remove id=%llu", (unsigned long long)snap_id.val);
1802
1803 // check if the key exists. we can't rely on remove_key doing this for
1804 // us, since OMAPRMKEYS returns success if the key is not there.
1805 // bug or feature? sounds like a bug, since tmap did not have this
1806 // behavior, but cls_rgw may rely on it...
1807 cls_rbd_snap snap;
1808 string snapshot_key;
1809 key_from_snap_id(snap_id, &snapshot_key);
1810 int r = read_key(hctx, snapshot_key, &snap);
1811 if (r == -ENOENT)
1812 return -ENOENT;
1813
1814 if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED)
1815 return -EBUSY;
1816
1817 r = cls_cxx_map_remove_key(hctx, snapshot_key);
1818 if (r < 0) {
1819 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1820 return r;
1821 }
1822
1823 return 0;
1824 }
1825
1826 /**
1827 * Returns a uint64_t of all the features supported by this class.
1828 */
1829 int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1830 {
1831 uint64_t all_features = RBD_FEATURES_ALL;
1832 ::encode(all_features, *out);
1833 return 0;
1834 }
1835
1836 /**
1837 * "Copy up" data from the parent of a clone to the clone's object(s).
1838 * Used for implementing copy-on-write for a clone image. Client
1839 * will pass down a chunk of data that fits completely within one
1840 * clone block (one object), and is aligned (starts at beginning of block),
1841 * but may be shorter (for non-full parent blocks). The class method
1842 * can't know the object size to validate the requested length,
1843 * so it just writes the data as given if the child object doesn't
1844 * already exist, and returns success if it does.
1845 *
1846 * Input:
1847 * @param in bufferlist of data to write
1848 *
1849 * Output:
1850 * @returns 0 on success, or if block already exists in child
1851 * negative error code on other error
1852 */
1853
1854 int copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1855 {
1856 // check for existence; if child object exists, just return success
1857 if (cls_cxx_stat(hctx, NULL, NULL) == 0)
1858 return 0;
1859 CLS_LOG(20, "copyup: writing length %d\n", in->length());
1860 return cls_cxx_write(hctx, 0, in->length(), in);
1861 }
1862
1863
1864 /************************ rbd_id object methods **************************/
1865
1866 /**
1867 * Input:
1868 * @param in ignored
1869 *
1870 * Output:
1871 * @param id the id stored in the object
1872 * @returns 0 on success, negative error code on failure
1873 */
1874 int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1875 {
1876 uint64_t size;
1877 int r = cls_cxx_stat(hctx, &size, NULL);
1878 if (r < 0)
1879 return r;
1880
1881 if (size == 0)
1882 return -ENOENT;
1883
1884 bufferlist read_bl;
1885 r = cls_cxx_read(hctx, 0, size, &read_bl);
1886 if (r < 0) {
1887 CLS_ERR("get_id: could not read id: %s", cpp_strerror(r).c_str());
1888 return r;
1889 }
1890
1891 string id;
1892 try {
1893 bufferlist::iterator iter = read_bl.begin();
1894 ::decode(id, iter);
1895 } catch (const buffer::error &err) {
1896 return -EIO;
1897 }
1898
1899 ::encode(id, *out);
1900 return 0;
1901 }
1902
1903 /**
1904 * Set the id of an image. The object must already exist.
1905 *
1906 * Input:
1907 * @param id the id of the image, as an alpha-numeric string
1908 *
1909 * Output:
1910 * @returns 0 on success, -EEXIST if the atomic create fails,
1911 * negative error code on other error
1912 */
1913 int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1914 {
1915 int r = check_exists(hctx);
1916 if (r < 0)
1917 return r;
1918
1919 string id;
1920 try {
1921 bufferlist::iterator iter = in->begin();
1922 ::decode(id, iter);
1923 } catch (const buffer::error &err) {
1924 return -EINVAL;
1925 }
1926
1927 if (!is_valid_id(id)) {
1928 CLS_ERR("set_id: invalid id '%s'", id.c_str());
1929 return -EINVAL;
1930 }
1931
1932 uint64_t size;
1933 r = cls_cxx_stat(hctx, &size, NULL);
1934 if (r < 0)
1935 return r;
1936 if (size != 0)
1937 return -EEXIST;
1938
1939 CLS_LOG(20, "set_id: id=%s", id.c_str());
1940
1941 bufferlist write_bl;
1942 ::encode(id, write_bl);
1943 return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
1944 }
1945
1946 /*********************** methods for rbd_directory ***********************/
1947
1948 static const string dir_key_for_id(const string &id)
1949 {
1950 return RBD_DIR_ID_KEY_PREFIX + id;
1951 }
1952
1953 static const string dir_key_for_name(const string &name)
1954 {
1955 return RBD_DIR_NAME_KEY_PREFIX + name;
1956 }
1957
1958 static const string dir_name_from_key(const string &key)
1959 {
1960 return key.substr(strlen(RBD_DIR_NAME_KEY_PREFIX));
1961 }
1962
1963 static int dir_add_image_helper(cls_method_context_t hctx,
1964 const string &name, const string &id,
1965 bool check_for_unique_id)
1966 {
1967 if (!name.size() || !is_valid_id(id)) {
1968 CLS_ERR("dir_add_image_helper: invalid name '%s' or id '%s'",
1969 name.c_str(), id.c_str());
1970 return -EINVAL;
1971 }
1972
1973 CLS_LOG(20, "dir_add_image_helper name=%s id=%s", name.c_str(), id.c_str());
1974
1975 string tmp;
1976 string name_key = dir_key_for_name(name);
1977 string id_key = dir_key_for_id(id);
1978 int r = read_key(hctx, name_key, &tmp);
1979 if (r != -ENOENT) {
1980 CLS_LOG(10, "name already exists");
1981 return -EEXIST;
1982 }
1983 r = read_key(hctx, id_key, &tmp);
1984 if (r != -ENOENT && check_for_unique_id) {
1985 CLS_LOG(10, "id already exists");
1986 return -EBADF;
1987 }
1988 bufferlist id_bl, name_bl;
1989 ::encode(id, id_bl);
1990 ::encode(name, name_bl);
1991 map<string, bufferlist> omap_vals;
1992 omap_vals[name_key] = id_bl;
1993 omap_vals[id_key] = name_bl;
1994 return cls_cxx_map_set_vals(hctx, &omap_vals);
1995 }
1996
1997 static int dir_remove_image_helper(cls_method_context_t hctx,
1998 const string &name, const string &id)
1999 {
2000 CLS_LOG(20, "dir_remove_image_helper name=%s id=%s",
2001 name.c_str(), id.c_str());
2002
2003 string stored_name, stored_id;
2004 string name_key = dir_key_for_name(name);
2005 string id_key = dir_key_for_id(id);
2006 int r = read_key(hctx, name_key, &stored_id);
2007 if (r < 0) {
2008 if (r != -ENOENT)
2009 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
2010 return r;
2011 }
2012 r = read_key(hctx, id_key, &stored_name);
2013 if (r < 0) {
2014 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
2015 return r;
2016 }
2017
2018 // check if this op raced with a rename
2019 if (stored_name != name || stored_id != id) {
2020 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
2021 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
2022 return -ESTALE;
2023 }
2024
2025 r = cls_cxx_map_remove_key(hctx, name_key);
2026 if (r < 0) {
2027 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
2028 return r;
2029 }
2030
2031 r = cls_cxx_map_remove_key(hctx, id_key);
2032 if (r < 0) {
2033 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
2034 return r;
2035 }
2036
2037 return 0;
2038 }
2039
2040 /**
2041 * Rename an image in the directory, updating both indexes
2042 * atomically. This can't be done from the client calling
2043 * dir_add_image and dir_remove_image in one transaction because the
2044 * results of the first method are not visibale to later steps.
2045 *
2046 * Input:
2047 * @param src original name of the image
2048 * @param dest new name of the image
2049 * @param id the id of the image
2050 *
2051 * Output:
2052 * @returns -ESTALE if src and id do not map to each other
2053 * @returns -ENOENT if src or id are not in the directory
2054 * @returns -EEXIST if dest already exists
2055 * @returns 0 on success, negative error code on failure
2056 */
2057 int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2058 {
2059 string src, dest, id;
2060 try {
2061 bufferlist::iterator iter = in->begin();
2062 ::decode(src, iter);
2063 ::decode(dest, iter);
2064 ::decode(id, iter);
2065 } catch (const buffer::error &err) {
2066 return -EINVAL;
2067 }
2068
2069 int r = dir_remove_image_helper(hctx, src, id);
2070 if (r < 0)
2071 return r;
2072 // ignore duplicate id because the result of
2073 // remove_image_helper is not visible yet
2074 return dir_add_image_helper(hctx, dest, id, false);
2075 }
2076
2077 /**
2078 * Get the id of an image given its name.
2079 *
2080 * Input:
2081 * @param name the name of the image
2082 *
2083 * Output:
2084 * @param id the id of the image
2085 * @returns 0 on success, negative error code on failure
2086 */
2087 int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2088 {
2089 string name;
2090
2091 try {
2092 bufferlist::iterator iter = in->begin();
2093 ::decode(name, iter);
2094 } catch (const buffer::error &err) {
2095 return -EINVAL;
2096 }
2097
2098 CLS_LOG(20, "dir_get_id: name=%s", name.c_str());
2099
2100 string id;
2101 int r = read_key(hctx, dir_key_for_name(name), &id);
2102 if (r < 0) {
2103 if (r != -ENOENT)
2104 CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
2105 return r;
2106 }
2107 ::encode(id, *out);
2108 return 0;
2109 }
2110
2111 /**
2112 * Get the name of an image given its id.
2113 *
2114 * Input:
2115 * @param id the id of the image
2116 *
2117 * Output:
2118 * @param name the name of the image
2119 * @returns 0 on success, negative error code on failure
2120 */
2121 int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2122 {
2123 string id;
2124
2125 try {
2126 bufferlist::iterator iter = in->begin();
2127 ::decode(id, iter);
2128 } catch (const buffer::error &err) {
2129 return -EINVAL;
2130 }
2131
2132 CLS_LOG(20, "dir_get_name: id=%s", id.c_str());
2133
2134 string name;
2135 int r = read_key(hctx, dir_key_for_id(id), &name);
2136 if (r < 0) {
2137 CLS_ERR("error reading name for id '%s': %s", id.c_str(), cpp_strerror(r).c_str());
2138 return r;
2139 }
2140 ::encode(name, *out);
2141 return 0;
2142 }
2143
2144 /**
2145 * List the names and ids of the images in the directory, sorted by
2146 * name.
2147 *
2148 * Input:
2149 * @param start_after which name to begin listing after
2150 * (use the empty string to start at the beginning)
2151 * @param max_return the maximum number of names to list
2152 *
2153 * Output:
2154 * @param images map from name to id of up to max_return images
2155 * @returns 0 on success, negative error code on failure
2156 */
2157 int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2158 {
2159 string start_after;
2160 uint64_t max_return;
2161
2162 try {
2163 bufferlist::iterator iter = in->begin();
2164 ::decode(start_after, iter);
2165 ::decode(max_return, iter);
2166 } catch (const buffer::error &err) {
2167 return -EINVAL;
2168 }
2169
2170 int max_read = RBD_MAX_KEYS_READ;
2171 map<string, string> images;
2172 string last_read = dir_key_for_name(start_after);
2173 bool more = true;
2174
2175 while (more && images.size() < max_return) {
2176 map<string, bufferlist> vals;
2177 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
2178 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
2179 max_read, &vals, &more);
2180 if (r < 0) {
2181 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
2182 return r;
2183 }
2184
2185 for (map<string, bufferlist>::iterator it = vals.begin();
2186 it != vals.end(); ++it) {
2187 string id;
2188 bufferlist::iterator iter = it->second.begin();
2189 try {
2190 ::decode(id, iter);
2191 } catch (const buffer::error &err) {
2192 CLS_ERR("could not decode id of image '%s'", it->first.c_str());
2193 return -EIO;
2194 }
2195 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(it->first).c_str(), id.c_str());
2196 images[dir_name_from_key(it->first)] = id;
2197 if (images.size() >= max_return)
2198 break;
2199 }
2200 if (!vals.empty()) {
2201 last_read = dir_key_for_name(images.rbegin()->first);
2202 }
2203 }
2204
2205 ::encode(images, *out);
2206
2207 return 0;
2208 }
2209
2210 /**
2211 * Add an image to the rbd directory. Creates the directory object if
2212 * needed, and updates the index from id to name and name to id.
2213 *
2214 * Input:
2215 * @param name the name of the image
2216 * @param id the id of the image
2217 *
2218 * Output:
2219 * @returns -EEXIST if the image name is already in the directory
2220 * @returns -EBADF if the image id is already in the directory
2221 * @returns 0 on success, negative error code on failure
2222 */
2223 int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2224 {
2225 int r = cls_cxx_create(hctx, false);
2226 if (r < 0) {
2227 CLS_ERR("could not create directory: %s", cpp_strerror(r).c_str());
2228 return r;
2229 }
2230
2231 string name, id;
2232 try {
2233 bufferlist::iterator iter = in->begin();
2234 ::decode(name, iter);
2235 ::decode(id, iter);
2236 } catch (const buffer::error &err) {
2237 return -EINVAL;
2238 }
2239
2240 return dir_add_image_helper(hctx, name, id, true);
2241 }
2242
2243 /**
2244 * Remove an image from the rbd directory.
2245 *
2246 * Input:
2247 * @param name the name of the image
2248 * @param id the id of the image
2249 *
2250 * Output:
2251 * @returns -ESTALE if the name and id do not map to each other
2252 * @returns 0 on success, negative error code on failure
2253 */
2254 int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2255 {
2256 string name, id;
2257 try {
2258 bufferlist::iterator iter = in->begin();
2259 ::decode(name, iter);
2260 ::decode(id, iter);
2261 } catch (const buffer::error &err) {
2262 return -EINVAL;
2263 }
2264
2265 return dir_remove_image_helper(hctx, name, id);
2266 }
2267
2268 int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
2269 {
2270 uint64_t size;
2271 int r = cls_cxx_stat(hctx, &size, NULL);
2272 if (r < 0) {
2273 return r;
2274 }
2275 if (size == 0) {
2276 return -ENOENT;
2277 }
2278
2279 bufferlist bl;
2280 r = cls_cxx_read(hctx, 0, size, &bl);
2281 if (r < 0) {
2282 return r;
2283 }
2284
2285 try {
2286 bufferlist::iterator iter = bl.begin();
2287 ::decode(object_map, iter);
2288 } catch (const buffer::error &err) {
2289 CLS_ERR("failed to decode object map: %s", err.what());
2290 return -EINVAL;
2291 }
2292 return 0;
2293 }
2294
2295 /**
2296 * Load an rbd image's object map
2297 *
2298 * Input:
2299 * none
2300 *
2301 * Output:
2302 * @param object map bit vector
2303 * @returns 0 on success, negative error code on failure
2304 */
2305 int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2306 {
2307 BitVector<2> object_map;
2308 int r = object_map_read(hctx, object_map);
2309 if (r < 0) {
2310 return r;
2311 }
2312
2313 object_map.set_crc_enabled(false);
2314 ::encode(object_map, *out);
2315 return 0;
2316 }
2317
2318 /**
2319 * Save an rbd image's object map
2320 *
2321 * Input:
2322 * @param object map bit vector
2323 *
2324 * Output:
2325 * @returns 0 on success, negative error code on failure
2326 */
2327 int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2328 {
2329 BitVector<2> object_map;
2330 try {
2331 bufferlist::iterator iter = in->begin();
2332 ::decode(object_map, iter);
2333 } catch (const buffer::error &err) {
2334 return -EINVAL;
2335 }
2336
2337 object_map.set_crc_enabled(true);
2338
2339 bufferlist bl;
2340 ::encode(object_map, bl);
2341 CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
2342 object_map.size(), bl.length());
2343 return cls_cxx_write_full(hctx, &bl);
2344 }
2345
2346 /**
2347 * Resize an rbd image's object map
2348 *
2349 * Input:
2350 * @param object_count the max number of objects in the image
2351 * @param default_state the default state of newly created objects
2352 *
2353 * Output:
2354 * @returns 0 on success, negative error code on failure
2355 */
2356 int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2357 {
2358 uint64_t object_count;
2359 uint8_t default_state;
2360 try {
2361 bufferlist::iterator iter = in->begin();
2362 ::decode(object_count, iter);
2363 ::decode(default_state, iter);
2364 } catch (const buffer::error &err) {
2365 return -EINVAL;
2366 }
2367
2368 // protect against excessive memory requirements
2369 if (object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
2370 CLS_ERR("object map too large: %" PRIu64, object_count);
2371 return -EINVAL;
2372 }
2373
2374 BitVector<2> object_map;
2375 int r = object_map_read(hctx, object_map);
2376 if ((r < 0) && (r != -ENOENT)) {
2377 return r;
2378 }
2379
2380 size_t orig_object_map_size = object_map.size();
2381 if (object_count < orig_object_map_size) {
2382 for (uint64_t i = object_count + 1; i < orig_object_map_size; ++i) {
2383 if (object_map[i] != default_state) {
2384 CLS_ERR("object map indicates object still exists: %" PRIu64, i);
2385 return -ESTALE;
2386 }
2387 }
2388 object_map.resize(object_count);
2389 } else if (object_count > orig_object_map_size) {
2390 object_map.resize(object_count);
2391 for (uint64_t i = orig_object_map_size; i < object_count; ++i) {
2392 object_map[i] = default_state;
2393 }
2394 }
2395
2396 bufferlist map;
2397 ::encode(object_map, map);
2398 CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
2399 object_count, map.length());
2400 return cls_cxx_write_full(hctx, &map);
2401 }
2402
2403 /**
2404 * Update an rbd image's object map
2405 *
2406 * Input:
2407 * @param start_object_no the start object iterator
2408 * @param end_object_no the end object iterator
2409 * @param new_object_state the new object state
2410 * @param current_object_state optional current object state filter
2411 *
2412 * Output:
2413 * @returns 0 on success, negative error code on failure
2414 */
2415 int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2416 {
2417 uint64_t start_object_no;
2418 uint64_t end_object_no;
2419 uint8_t new_object_state;
2420 boost::optional<uint8_t> current_object_state;
2421 try {
2422 bufferlist::iterator iter = in->begin();
2423 ::decode(start_object_no, iter);
2424 ::decode(end_object_no, iter);
2425 ::decode(new_object_state, iter);
2426 ::decode(current_object_state, iter);
2427 } catch (const buffer::error &err) {
2428 CLS_ERR("failed to decode message");
2429 return -EINVAL;
2430 }
2431
2432 uint64_t size;
2433 int r = cls_cxx_stat(hctx, &size, NULL);
2434 if (r < 0) {
2435 return r;
2436 }
2437
2438 BitVector<2> object_map;
2439 bufferlist header_bl;
2440 r = cls_cxx_read2(hctx, 0, object_map.get_header_length(), &header_bl,
2441 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2442 if (r < 0) {
2443 CLS_ERR("object map header read failed");
2444 return r;
2445 }
2446
2447 try {
2448 bufferlist::iterator it = header_bl.begin();
2449 object_map.decode_header(it);
2450 } catch (const buffer::error &err) {
2451 CLS_ERR("failed to decode object map header: %s", err.what());
2452 return -EINVAL;
2453 }
2454
2455 bufferlist footer_bl;
2456 r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
2457 size - object_map.get_footer_offset(), &footer_bl,
2458 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2459 if (r < 0) {
2460 CLS_ERR("object map footer read failed");
2461 return r;
2462 }
2463
2464 try {
2465 bufferlist::iterator it = footer_bl.begin();
2466 object_map.decode_footer(it);
2467 } catch (const buffer::error &err) {
2468 CLS_ERR("failed to decode object map footer: %s", err.what());
2469 }
2470
2471 if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
2472 return -ERANGE;
2473 }
2474
2475 uint64_t byte_offset;
2476 uint64_t byte_length;
2477 object_map.get_data_extents(start_object_no,
2478 end_object_no - start_object_no,
2479 &byte_offset, &byte_length);
2480
2481 bufferlist data_bl;
2482 r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
2483 byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2484 if (r < 0) {
2485 CLS_ERR("object map data read failed");
2486 return r;
2487 }
2488
2489 try {
2490 bufferlist::iterator it = data_bl.begin();
2491 object_map.decode_data(it, byte_offset);
2492 } catch (const buffer::error &err) {
2493 CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
2494 byte_offset, err.what());
2495 return -EINVAL;
2496 }
2497
2498 bool updated = false;
2499 for (uint64_t object_no = start_object_no; object_no < end_object_no;
2500 ++object_no) {
2501 uint8_t state = object_map[object_no];
2502 if ((!current_object_state || state == *current_object_state ||
2503 (*current_object_state == OBJECT_EXISTS &&
2504 state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
2505 object_map[object_no] = new_object_state;
2506 updated = true;
2507 }
2508 }
2509
2510 if (updated) {
2511 CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
2512 byte_offset, byte_length,
2513 object_map.get_header_length() + byte_offset);
2514
2515 bufferlist data_bl;
2516 object_map.encode_data(data_bl, byte_offset, byte_length);
2517 r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
2518 data_bl.length(), &data_bl,
2519 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2520 if (r < 0) {
2521 CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
2522 return r;
2523 }
2524
2525 footer_bl.clear();
2526 object_map.encode_footer(footer_bl);
2527 r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
2528 &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2529 if (r < 0) {
2530 CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
2531 return r;
2532 }
2533 } else {
2534 CLS_LOG(20, "object_map_update: no update necessary");
2535 }
2536
2537 return 0;
2538 }
2539
2540 /**
2541 * Mark all _EXISTS objects as _EXISTS_CLEAN so future writes to the
2542 * image HEAD can be tracked.
2543 *
2544 * Input:
2545 * none
2546 *
2547 * Output:
2548 * @returns 0 on success, negative error code on failure
2549 */
2550 int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
2551 bufferlist *out)
2552 {
2553 BitVector<2> object_map;
2554 int r = object_map_read(hctx, object_map);
2555 if (r < 0) {
2556 return r;
2557 }
2558
2559 bool updated = false;
2560 for (uint64_t i = 0; i < object_map.size(); ++i) {
2561 if (object_map[i] == OBJECT_EXISTS) {
2562 object_map[i] = OBJECT_EXISTS_CLEAN;
2563 updated = true;
2564 }
2565 }
2566
2567 if (updated) {
2568 bufferlist bl;
2569 ::encode(object_map, bl);
2570 r = cls_cxx_write_full(hctx, &bl);
2571 }
2572 return r;
2573 }
2574
2575 /**
2576 * Mark all _EXISTS_CLEAN objects as _EXISTS in the current object map
2577 * if the provided snapshot object map object is marked as _EXISTS.
2578 *
2579 * Input:
2580 * @param snapshot object map bit vector
2581 *
2582 * Output:
2583 * @returns 0 on success, negative error code on failure
2584 */
2585 int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
2586 bufferlist *out)
2587 {
2588 BitVector<2> src_object_map;
2589 try {
2590 bufferlist::iterator iter = in->begin();
2591 ::decode(src_object_map, iter);
2592 } catch (const buffer::error &err) {
2593 return -EINVAL;
2594 }
2595
2596 BitVector<2> dst_object_map;
2597 int r = object_map_read(hctx, dst_object_map);
2598 if (r < 0) {
2599 return r;
2600 }
2601
2602 bool updated = false;
2603 for (uint64_t i = 0; i < dst_object_map.size(); ++i) {
2604 if (dst_object_map[i] == OBJECT_EXISTS_CLEAN &&
2605 (i >= src_object_map.size() || src_object_map[i] == OBJECT_EXISTS)) {
2606 dst_object_map[i] = OBJECT_EXISTS;
2607 updated = true;
2608 }
2609 }
2610
2611 if (updated) {
2612 bufferlist bl;
2613 ::encode(dst_object_map, bl);
2614 r = cls_cxx_write_full(hctx, &bl);
2615 }
2616 return r;
2617 }
2618
2619 static const string metadata_key_for_name(const string &name)
2620 {
2621 return RBD_METADATA_KEY_PREFIX + name;
2622 }
2623
2624 static const string metadata_name_from_key(const string &key)
2625 {
2626 return key.substr(strlen(RBD_METADATA_KEY_PREFIX));
2627 }
2628
2629 /**
2630 * Input:
2631 * @param start_after which name to begin listing after
2632 * (use the empty string to start at the beginning)
2633 * @param max_return the maximum number of names to list
2634
2635 * Output:
2636 * @param value
2637 * @returns 0 on success, negative error code on failure
2638 */
2639 int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2640 {
2641 string start_after;
2642 uint64_t max_return;
2643
2644 try {
2645 bufferlist::iterator iter = in->begin();
2646 ::decode(start_after, iter);
2647 ::decode(max_return, iter);
2648 } catch (const buffer::error &err) {
2649 return -EINVAL;
2650 }
2651
2652 // TODO remove implicit support for zero during the N-release
2653 if (max_return == 0) {
2654 max_return = RBD_MAX_KEYS_READ;
2655 }
2656
2657 map<string, bufferlist> data;
2658 string last_read = metadata_key_for_name(start_after);
2659 bool more = true;
2660
2661 while (more && data.size() < max_return) {
2662 map<string, bufferlist> raw_data;
2663 int max_read = MIN(RBD_MAX_KEYS_READ, max_return - data.size());
2664 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
2665 max_read, &raw_data, &more);
2666 if (r < 0) {
2667 CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
2668 return r;
2669 }
2670
2671 for (auto& kv : raw_data) {
2672 data[metadata_name_from_key(kv.first)].swap(kv.second);
2673 }
2674
2675 if (!raw_data.empty()) {
2676 last_read = raw_data.rbegin()->first;
2677 }
2678 }
2679
2680 ::encode(data, *out);
2681 return 0;
2682 }
2683
2684 /**
2685 * Input:
2686 * @param data <map(key, value)>
2687 *
2688 * Output:
2689 * @returns 0 on success, negative error code on failure
2690 */
2691 int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2692 {
2693 map<string, bufferlist> data, raw_data;
2694
2695 bufferlist::iterator iter = in->begin();
2696 try {
2697 ::decode(data, iter);
2698 } catch (const buffer::error &err) {
2699 return -EINVAL;
2700 }
2701
2702 for (map<string, bufferlist>::iterator it = data.begin();
2703 it != data.end(); ++it) {
2704 CLS_LOG(20, "metdata_set key=%s value=%.*s", it->first.c_str(),
2705 it->second.length(), it->second.c_str());
2706 raw_data[metadata_key_for_name(it->first)].swap(it->second);
2707 }
2708 int r = cls_cxx_map_set_vals(hctx, &raw_data);
2709 if (r < 0) {
2710 CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
2711 return r;
2712 }
2713
2714 return 0;
2715 }
2716
2717 /**
2718 * Input:
2719 * @param key
2720 *
2721 * Output:
2722 * @returns 0 on success, negative error code on failure
2723 */
2724 int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2725 {
2726 string key;
2727
2728 bufferlist::iterator iter = in->begin();
2729 try {
2730 ::decode(key, iter);
2731 } catch (const buffer::error &err) {
2732 return -EINVAL;
2733 }
2734
2735 CLS_LOG(20, "metdata_set key=%s", key.c_str());
2736
2737 int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
2738 if (r < 0) {
2739 CLS_ERR("error remove metadata: %s", cpp_strerror(r).c_str());
2740 return r;
2741 }
2742
2743 return 0;
2744 }
2745
2746 /**
2747 * Input:
2748 * @param key
2749 *
2750 * Output:
2751 * @param metadata value associated with the key
2752 * @returns 0 on success, negative error code on failure
2753 */
2754 int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2755 {
2756 string key;
2757 bufferlist value;
2758
2759 bufferlist::iterator iter = in->begin();
2760 try {
2761 ::decode(key, iter);
2762 } catch (const buffer::error &err) {
2763 return -EINVAL;
2764 }
2765
2766 CLS_LOG(20, "metdata_get key=%s", key.c_str());
2767
2768 int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
2769 if (r < 0) {
2770 CLS_ERR("error get metadata: %s", cpp_strerror(r).c_str());
2771 return r;
2772 }
2773
2774 ::encode(value, *out);
2775 return 0;
2776 }
2777
2778 int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
2779 bufferlist *out)
2780 {
2781 uint64_t snap_limit;
2782 int r = read_key(hctx, "snap_limit", &snap_limit);
2783 if (r == -ENOENT) {
2784 snap_limit = UINT64_MAX;
2785 } else if (r < 0) {
2786 CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
2787 return r;
2788 }
2789
2790 CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
2791 ::encode(snap_limit, *out);
2792
2793 return 0;
2794 }
2795
2796 int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
2797 bufferlist *out)
2798 {
2799 int rc;
2800 uint64_t new_limit;
2801 bufferlist bl;
2802
2803 try {
2804 bufferlist::iterator iter = in->begin();
2805 ::decode(new_limit, iter);
2806 } catch (const buffer::error &err) {
2807 return -EINVAL;
2808 }
2809
2810 if (new_limit == UINT64_MAX) {
2811 CLS_LOG(20, "remove snapshot limit\n");
2812 rc = cls_cxx_map_remove_key(hctx, "snap_limit");
2813 } else {
2814 CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
2815 ::encode(new_limit, bl);
2816 rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
2817 }
2818
2819 return rc;
2820 }
2821
2822
2823 /****************************** Old format *******************************/
2824
2825 int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2826 {
2827 bufferlist bl;
2828 struct rbd_obj_header_ondisk *header;
2829 int rc = snap_read_header(hctx, bl);
2830 if (rc < 0)
2831 return rc;
2832
2833 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2834 bufferptr p(header->snap_names_len);
2835 char *buf = (char *)header;
2836 char *name = buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk);
2837 char *end = name + header->snap_names_len;
2838 memcpy(p.c_str(),
2839 buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
2840 header->snap_names_len);
2841
2842 ::encode(header->snap_seq, *out);
2843 ::encode(header->snap_count, *out);
2844
2845 for (unsigned i = 0; i < header->snap_count; i++) {
2846 string s = name;
2847 ::encode(header->snaps[i].id, *out);
2848 ::encode(header->snaps[i].image_size, *out);
2849 ::encode(s, *out);
2850
2851 name += strlen(name) + 1;
2852 if (name > end)
2853 return -EIO;
2854 }
2855
2856 return 0;
2857 }
2858
2859 int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2860 {
2861 bufferlist bl;
2862 struct rbd_obj_header_ondisk *header;
2863 bufferlist newbl;
2864 bufferptr header_bp(sizeof(*header));
2865 struct rbd_obj_snap_ondisk *new_snaps;
2866
2867 int rc = snap_read_header(hctx, bl);
2868 if (rc < 0)
2869 return rc;
2870
2871 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2872
2873 int snaps_id_ofs = sizeof(*header);
2874 int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
2875 const char *snap_name;
2876 const char *snap_names = ((char *)header) + names_ofs;
2877 const char *end = snap_names + header->snap_names_len;
2878 bufferlist::iterator iter = in->begin();
2879 string s;
2880 uint64_t snap_id;
2881
2882 try {
2883 ::decode(s, iter);
2884 ::decode(snap_id, iter);
2885 } catch (const buffer::error &err) {
2886 return -EINVAL;
2887 }
2888 snap_name = s.c_str();
2889
2890 if (header->snap_seq > snap_id)
2891 return -ESTALE;
2892
2893 uint64_t snap_limit;
2894 rc = read_key(hctx, "snap_limit", &snap_limit);
2895 if (rc == -ENOENT) {
2896 snap_limit = UINT64_MAX;
2897 } else if (rc < 0) {
2898 return rc;
2899 }
2900
2901 if (header->snap_count >= snap_limit)
2902 return -EDQUOT;
2903
2904 const char *cur_snap_name;
2905 for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
2906 if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
2907 return -EEXIST;
2908 }
2909 if (cur_snap_name > end)
2910 return -EIO;
2911
2912 int snap_name_len = strlen(snap_name);
2913
2914 bufferptr new_names_bp(header->snap_names_len + snap_name_len + 1);
2915 bufferptr new_snaps_bp(sizeof(*new_snaps) * (header->snap_count + 1));
2916
2917 /* copy snap names and append to new snap name */
2918 char *new_snap_names = new_names_bp.c_str();
2919 strcpy(new_snap_names, snap_name);
2920 memcpy(new_snap_names + snap_name_len + 1, snap_names, header->snap_names_len);
2921
2922 /* append new snap id */
2923 new_snaps = (struct rbd_obj_snap_ondisk *)new_snaps_bp.c_str();
2924 memcpy(new_snaps + 1, header->snaps, sizeof(*new_snaps) * header->snap_count);
2925
2926 header->snap_count = header->snap_count + 1;
2927 header->snap_names_len = header->snap_names_len + snap_name_len + 1;
2928 header->snap_seq = snap_id;
2929
2930 new_snaps[0].id = snap_id;
2931 new_snaps[0].image_size = header->image_size;
2932
2933 memcpy(header_bp.c_str(), header, sizeof(*header));
2934
2935 newbl.push_back(header_bp);
2936 newbl.push_back(new_snaps_bp);
2937 newbl.push_back(new_names_bp);
2938
2939 rc = cls_cxx_write_full(hctx, &newbl);
2940 if (rc < 0)
2941 return rc;
2942
2943 return 0;
2944 }
2945
2946 int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2947 {
2948 bufferlist bl;
2949 struct rbd_obj_header_ondisk *header;
2950 bufferlist newbl;
2951 bufferptr header_bp(sizeof(*header));
2952
2953 int rc = snap_read_header(hctx, bl);
2954 if (rc < 0)
2955 return rc;
2956
2957 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2958
2959 int snaps_id_ofs = sizeof(*header);
2960 int names_ofs = snaps_id_ofs + sizeof(struct rbd_obj_snap_ondisk) * header->snap_count;
2961 const char *snap_name;
2962 const char *snap_names = ((char *)header) + names_ofs;
2963 const char *orig_names = snap_names;
2964 const char *end = snap_names + header->snap_names_len;
2965 bufferlist::iterator iter = in->begin();
2966 string s;
2967 unsigned i;
2968 bool found = false;
2969 struct rbd_obj_snap_ondisk snap;
2970
2971 try {
2972 ::decode(s, iter);
2973 } catch (const buffer::error &err) {
2974 return -EINVAL;
2975 }
2976 snap_name = s.c_str();
2977
2978 for (i = 0; snap_names < end; i++) {
2979 if (strcmp(snap_names, snap_name) == 0) {
2980 snap = header->snaps[i];
2981 found = true;
2982 break;
2983 }
2984 snap_names += strlen(snap_names) + 1;
2985 }
2986 if (!found) {
2987 CLS_ERR("couldn't find snap %s\n", snap_name);
2988 return -ENOENT;
2989 }
2990
2991 header->snap_names_len = header->snap_names_len - (s.length() + 1);
2992 header->snap_count = header->snap_count - 1;
2993
2994 bufferptr new_names_bp(header->snap_names_len);
2995 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
2996
2997 memcpy(header_bp.c_str(), header, sizeof(*header));
2998 newbl.push_back(header_bp);
2999
3000 if (header->snap_count) {
3001 int snaps_len = 0;
3002 int names_len = 0;
3003 CLS_LOG(20, "i=%u\n", i);
3004 if (i > 0) {
3005 snaps_len = sizeof(header->snaps[0]) * i;
3006 names_len = snap_names - orig_names;
3007 memcpy(new_snaps_bp.c_str(), header->snaps, snaps_len);
3008 memcpy(new_names_bp.c_str(), orig_names, names_len);
3009 }
3010 snap_names += s.length() + 1;
3011
3012 if (i < header->snap_count) {
3013 memcpy(new_snaps_bp.c_str() + snaps_len,
3014 header->snaps + i + 1,
3015 sizeof(header->snaps[0]) * (header->snap_count - i));
3016 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3017 }
3018 newbl.push_back(new_snaps_bp);
3019 newbl.push_back(new_names_bp);
3020 }
3021
3022 rc = cls_cxx_write_full(hctx, &newbl);
3023 if (rc < 0)
3024 return rc;
3025
3026 return 0;
3027 }
3028
3029 /**
3030 * rename snapshot of old format.
3031 *
3032 * Input:
3033 * @param src_snap_id old snap id of the snapshot (snapid_t)
3034 * @param dst_snap_name new name of the snapshot (string)
3035 *
3036 * Output:
3037 * @returns 0 on success, negative error code on failure.
3038 */
3039 int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3040 {
3041 bufferlist bl;
3042 struct rbd_obj_header_ondisk *header;
3043 bufferlist newbl;
3044 bufferptr header_bp(sizeof(*header));
3045 snapid_t src_snap_id;
3046 const char *dst_snap_name;
3047 string dst;
3048
3049 int rc = snap_read_header(hctx, bl);
3050 if (rc < 0)
3051 return rc;
3052
3053 header = (struct rbd_obj_header_ondisk *)bl.c_str();
3054
3055 int snaps_id_ofs = sizeof(*header);
3056 int names_ofs = snaps_id_ofs + sizeof(rbd_obj_snap_ondisk) * header->snap_count;
3057 const char *snap_names = ((char *)header) + names_ofs;
3058 const char *orig_names = snap_names;
3059 const char *end = snap_names + header->snap_names_len;
3060 bufferlist::iterator iter = in->begin();
3061 unsigned i;
3062 bool found = false;
3063
3064 try {
3065 ::decode(src_snap_id, iter);
3066 ::decode(dst, iter);
3067 } catch (const buffer::error &err) {
3068 return -EINVAL;
3069 }
3070 dst_snap_name = dst.c_str();
3071
3072 const char *cur_snap_name;
3073 for (cur_snap_name = snap_names; cur_snap_name < end;
3074 cur_snap_name += strlen(cur_snap_name) + 1) {
3075 if (strcmp(cur_snap_name, dst_snap_name) == 0)
3076 return -EEXIST;
3077 }
3078 if (cur_snap_name > end)
3079 return -EIO;
3080 for (i = 0; i < header->snap_count; i++) {
3081 if (src_snap_id == header->snaps[i].id) {
3082 found = true;
3083 break;
3084 }
3085 snap_names += strlen(snap_names) + 1;
3086 }
3087 if (!found) {
3088 CLS_ERR("couldn't find snap %llu\n", (unsigned long long)src_snap_id.val);
3089 return -ENOENT;
3090 }
3091
3092 CLS_LOG(20, "rename snap with snap id %llu to dest name %s", (unsigned long long)src_snap_id.val, dst_snap_name);
3093 header->snap_names_len = header->snap_names_len - strlen(snap_names) + dst.length();
3094
3095 bufferptr new_names_bp(header->snap_names_len);
3096 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
3097
3098 if (header->snap_count) {
3099 int names_len = 0;
3100 CLS_LOG(20, "i=%u\n", i);
3101 if (i > 0) {
3102 names_len = snap_names - orig_names;
3103 memcpy(new_names_bp.c_str(), orig_names, names_len);
3104 }
3105 strcpy(new_names_bp.c_str() + names_len, dst_snap_name);
3106 names_len += strlen(dst_snap_name) + 1;
3107 snap_names += strlen(snap_names) + 1;
3108 if (i < header->snap_count) {
3109 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3110 }
3111 memcpy(new_snaps_bp.c_str(), header->snaps, sizeof(header->snaps[0]) * header->snap_count);
3112 }
3113
3114 memcpy(header_bp.c_str(), header, sizeof(*header));
3115 newbl.push_back(header_bp);
3116 newbl.push_back(new_snaps_bp);
3117 newbl.push_back(new_names_bp);
3118
3119 rc = cls_cxx_write_full(hctx, &newbl);
3120 if (rc < 0)
3121 return rc;
3122 return 0;
3123 }
3124
3125
3126 namespace mirror {
3127
3128 static const std::string UUID("mirror_uuid");
3129 static const std::string MODE("mirror_mode");
3130 static const std::string PEER_KEY_PREFIX("mirror_peer_");
3131 static const std::string IMAGE_KEY_PREFIX("image_");
3132 static const std::string GLOBAL_KEY_PREFIX("global_");
3133 static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
3134 static const std::string INSTANCE_KEY_PREFIX("instance_");
3135
3136 std::string peer_key(const std::string &uuid) {
3137 return PEER_KEY_PREFIX + uuid;
3138 }
3139
3140 std::string image_key(const string &image_id) {
3141 return IMAGE_KEY_PREFIX + image_id;
3142 }
3143
3144 std::string global_key(const string &global_id) {
3145 return GLOBAL_KEY_PREFIX + global_id;
3146 }
3147
3148 std::string status_global_key(const string &global_id) {
3149 return STATUS_GLOBAL_KEY_PREFIX + global_id;
3150 }
3151
3152 std::string instance_key(const string &instance_id) {
3153 return INSTANCE_KEY_PREFIX + instance_id;
3154 }
3155
3156 int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
3157 bufferlist mirror_uuid_bl;
3158 int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
3159 if (r < 0) {
3160 if (r != -ENOENT) {
3161 CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
3162 }
3163 return r;
3164 }
3165
3166 *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
3167 return 0;
3168 }
3169
3170 int read_peers(cls_method_context_t hctx,
3171 std::vector<cls::rbd::MirrorPeer> *peers) {
3172 std::string last_read = PEER_KEY_PREFIX;
3173 int max_read = RBD_MAX_KEYS_READ;
3174 bool more = true;
3175 while (more) {
3176 std::map<std::string, bufferlist> vals;
3177 int r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
3178 max_read, &vals, &more);
3179 if (r < 0) {
3180 CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
3181 return r;
3182 }
3183
3184 for (auto &it : vals) {
3185 try {
3186 bufferlist::iterator bl_it = it.second.begin();
3187 cls::rbd::MirrorPeer peer;
3188 ::decode(peer, bl_it);
3189 peers->push_back(peer);
3190 } catch (const buffer::error &err) {
3191 CLS_ERR("could not decode peer '%s'", it.first.c_str());
3192 return -EIO;
3193 }
3194 }
3195
3196 if (!vals.empty()) {
3197 last_read = vals.rbegin()->first;
3198 }
3199 }
3200 return 0;
3201 }
3202
3203 int read_peer(cls_method_context_t hctx, const std::string &id,
3204 cls::rbd::MirrorPeer *peer) {
3205 bufferlist bl;
3206 int r = cls_cxx_map_get_val(hctx, peer_key(id), &bl);
3207 if (r < 0) {
3208 CLS_ERR("error reading peer '%s': %s", id.c_str(),
3209 cpp_strerror(r).c_str());
3210 return r;
3211 }
3212
3213 try {
3214 bufferlist::iterator bl_it = bl.begin();
3215 ::decode(*peer, bl_it);
3216 } catch (const buffer::error &err) {
3217 CLS_ERR("could not decode peer '%s'", id.c_str());
3218 return -EIO;
3219 }
3220 return 0;
3221 }
3222
3223 int write_peer(cls_method_context_t hctx, const std::string &id,
3224 const cls::rbd::MirrorPeer &peer) {
3225 bufferlist bl;
3226 ::encode(peer, bl);
3227
3228 int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
3229 if (r < 0) {
3230 CLS_ERR("error writing peer '%s': %s", id.c_str(),
3231 cpp_strerror(r).c_str());
3232 return r;
3233 }
3234 return 0;
3235 }
3236
3237 int image_get(cls_method_context_t hctx, const string &image_id,
3238 cls::rbd::MirrorImage *mirror_image) {
3239 bufferlist bl;
3240 int r = cls_cxx_map_get_val(hctx, image_key(image_id), &bl);
3241 if (r < 0) {
3242 if (r != -ENOENT) {
3243 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3244 cpp_strerror(r).c_str());
3245 }
3246 return r;
3247 }
3248
3249 try {
3250 bufferlist::iterator it = bl.begin();
3251 ::decode(*mirror_image, it);
3252 } catch (const buffer::error &err) {
3253 CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
3254 return -EIO;
3255 }
3256
3257 return 0;
3258 }
3259
3260 int image_set(cls_method_context_t hctx, const string &image_id,
3261 const cls::rbd::MirrorImage &mirror_image) {
3262 bufferlist bl;
3263 ::encode(mirror_image, bl);
3264
3265 cls::rbd::MirrorImage existing_mirror_image;
3266 int r = image_get(hctx, image_id, &existing_mirror_image);
3267 if (r == -ENOENT) {
3268 // make sure global id doesn't already exist
3269 std::string global_id_key = global_key(mirror_image.global_image_id);
3270 std::string image_id;
3271 r = read_key(hctx, global_id_key, &image_id);
3272 if (r >= 0) {
3273 return -EEXIST;
3274 } else if (r != -ENOENT) {
3275 CLS_ERR("error reading global image id: '%s': '%s'", image_id.c_str(),
3276 cpp_strerror(r).c_str());
3277 return r;
3278 }
3279
3280 // make sure this was not a race for disabling
3281 if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3282 CLS_ERR("image '%s' is already disabled", image_id.c_str());
3283 return r;
3284 }
3285 } else if (r < 0) {
3286 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3287 cpp_strerror(r).c_str());
3288 return r;
3289 } else if (existing_mirror_image.global_image_id !=
3290 mirror_image.global_image_id) {
3291 // cannot change the global id
3292 return -EINVAL;
3293 }
3294
3295 r = cls_cxx_map_set_val(hctx, image_key(image_id), &bl);
3296 if (r < 0) {
3297 CLS_ERR("error adding mirrored image '%s': %s", image_id.c_str(),
3298 cpp_strerror(r).c_str());
3299 return r;
3300 }
3301
3302 bufferlist image_id_bl;
3303 ::encode(image_id, image_id_bl);
3304 r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
3305 &image_id_bl);
3306 if (r < 0) {
3307 CLS_ERR("error adding global id for image '%s': %s", image_id.c_str(),
3308 cpp_strerror(r).c_str());
3309 return r;
3310 }
3311 return 0;
3312 }
3313
3314 int image_remove(cls_method_context_t hctx, const string &image_id) {
3315 bufferlist bl;
3316 cls::rbd::MirrorImage mirror_image;
3317 int r = image_get(hctx, image_id, &mirror_image);
3318 if (r < 0) {
3319 if (r != -ENOENT) {
3320 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3321 cpp_strerror(r).c_str());
3322 }
3323 return r;
3324 }
3325
3326 if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3327 return -EBUSY;
3328 }
3329
3330 r = cls_cxx_map_remove_key(hctx, image_key(image_id));
3331 if (r < 0) {
3332 CLS_ERR("error removing mirrored image '%s': %s", image_id.c_str(),
3333 cpp_strerror(r).c_str());
3334 return r;
3335 }
3336
3337 r = cls_cxx_map_remove_key(hctx, global_key(mirror_image.global_image_id));
3338 if (r < 0 && r != -ENOENT) {
3339 CLS_ERR("error removing global id for image '%s': %s", image_id.c_str(),
3340 cpp_strerror(r).c_str());
3341 return r;
3342 }
3343
3344 r = cls_cxx_map_remove_key(hctx,
3345 status_global_key(mirror_image.global_image_id));
3346 if (r < 0 && r != -ENOENT) {
3347 CLS_ERR("error removing global status for image '%s': %s", image_id.c_str(),
3348 cpp_strerror(r).c_str());
3349 return r;
3350 }
3351
3352 return 0;
3353 }
3354
3355 struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
3356 entity_inst_t origin;
3357
3358 MirrorImageStatusOnDisk() {
3359 }
3360 MirrorImageStatusOnDisk(const cls::rbd::MirrorImageStatus &status) :
3361 cls::rbd::MirrorImageStatus(status) {
3362 }
3363
3364 void encode_meta(bufferlist &bl, uint64_t features) const {
3365 ENCODE_START(1, 1, bl);
3366 ::encode(origin, bl, features);
3367 ENCODE_FINISH(bl);
3368 }
3369
3370 void encode(bufferlist &bl, uint64_t features) const {
3371 encode_meta(bl, features);
3372 cls::rbd::MirrorImageStatus::encode(bl);
3373 }
3374
3375 void decode_meta(bufferlist::iterator &it) {
3376 DECODE_START(1, it);
3377 ::decode(origin, it);
3378 DECODE_FINISH(it);
3379 }
3380
3381 void decode(bufferlist::iterator &it) {
3382 decode_meta(it);
3383 cls::rbd::MirrorImageStatus::decode(it);
3384 }
3385 };
3386 WRITE_CLASS_ENCODER_FEATURES(MirrorImageStatusOnDisk)
3387
3388 int image_status_set(cls_method_context_t hctx, const string &global_image_id,
3389 const cls::rbd::MirrorImageStatus &status) {
3390 MirrorImageStatusOnDisk ondisk_status(status);
3391 ondisk_status.up = false;
3392 ondisk_status.last_update = ceph_clock_now();
3393
3394 int r = cls_get_request_origin(hctx, &ondisk_status.origin);
3395 assert(r == 0);
3396
3397 bufferlist bl;
3398 encode(ondisk_status, bl, cls_get_features(hctx));
3399
3400 r = cls_cxx_map_set_val(hctx, status_global_key(global_image_id), &bl);
3401 if (r < 0) {
3402 CLS_ERR("error setting status for mirrored image, global id '%s': %s",
3403 global_image_id.c_str(), cpp_strerror(r).c_str());
3404 return r;
3405 }
3406 return 0;
3407 }
3408
3409 int image_status_remove(cls_method_context_t hctx,
3410 const string &global_image_id) {
3411
3412 int r = cls_cxx_map_remove_key(hctx, status_global_key(global_image_id));
3413 if (r < 0) {
3414 CLS_ERR("error removing status for mirrored image, global id '%s': %s",
3415 global_image_id.c_str(), cpp_strerror(r).c_str());
3416 return r;
3417 }
3418 return 0;
3419 }
3420
3421 int image_status_get(cls_method_context_t hctx, const string &global_image_id,
3422 cls::rbd::MirrorImageStatus *status) {
3423
3424 bufferlist bl;
3425 int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
3426 if (r < 0) {
3427 if (r != -ENOENT) {
3428 CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
3429 global_image_id.c_str(), cpp_strerror(r).c_str());
3430 }
3431 return r;
3432 }
3433
3434 MirrorImageStatusOnDisk ondisk_status;
3435 try {
3436 bufferlist::iterator it = bl.begin();
3437 decode(ondisk_status, it);
3438 } catch (const buffer::error &err) {
3439 CLS_ERR("could not decode status for mirrored image, global id '%s'",
3440 global_image_id.c_str());
3441 return -EIO;
3442 }
3443
3444 obj_list_watch_response_t watchers;
3445 r = cls_cxx_list_watchers(hctx, &watchers);
3446 if (r < 0 && r != -ENOENT) {
3447 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3448 return r;
3449 }
3450
3451 *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
3452 status->up = false;
3453 for (auto &w : watchers.entries) {
3454 if (w.name == ondisk_status.origin.name &&
3455 w.addr == ondisk_status.origin.addr) {
3456 status->up = true;
3457 break;
3458 }
3459 }
3460
3461 return 0;
3462 }
3463
3464 int image_status_list(cls_method_context_t hctx,
3465 const std::string &start_after, uint64_t max_return,
3466 map<std::string, cls::rbd::MirrorImage> *mirror_images,
3467 map<std::string, cls::rbd::MirrorImageStatus> *mirror_statuses) {
3468 std::string last_read = image_key(start_after);
3469 int max_read = RBD_MAX_KEYS_READ;
3470 bool more = true;
3471
3472 while (more && mirror_images->size() < max_return) {
3473 std::map<std::string, bufferlist> vals;
3474 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
3475 int r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read,
3476 &vals, &more);
3477 if (r < 0) {
3478 CLS_ERR("error reading mirror image directory by name: %s",
3479 cpp_strerror(r).c_str());
3480 return r;
3481 }
3482
3483 for (auto it = vals.begin(); it != vals.end() &&
3484 mirror_images->size() < max_return; ++it) {
3485 const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
3486 cls::rbd::MirrorImage mirror_image;
3487 bufferlist::iterator iter = it->second.begin();
3488 try {
3489 ::decode(mirror_image, iter);
3490 } catch (const buffer::error &err) {
3491 CLS_ERR("could not decode mirror image payload of image '%s'",
3492 image_id.c_str());
3493 return -EIO;
3494 }
3495
3496 (*mirror_images)[image_id] = mirror_image;
3497
3498 cls::rbd::MirrorImageStatus status;
3499 int r1 = image_status_get(hctx, mirror_image.global_image_id, &status);
3500 if (r1 < 0) {
3501 continue;
3502 }
3503
3504 (*mirror_statuses)[image_id] = status;
3505 }
3506 if (!vals.empty()) {
3507 last_read = image_key(mirror_images->rbegin()->first);
3508 }
3509 }
3510
3511 return 0;
3512 }
3513
3514 int image_status_get_summary(cls_method_context_t hctx,
3515 std::map<cls::rbd::MirrorImageStatusState, int> *states) {
3516 obj_list_watch_response_t watchers_;
3517 int r = cls_cxx_list_watchers(hctx, &watchers_);
3518 if (r < 0) {
3519 if (r != -ENOENT) {
3520 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3521 }
3522 return r;
3523 }
3524
3525 set<entity_inst_t> watchers;
3526 for (auto &w : watchers_.entries) {
3527 watchers.insert(entity_inst_t(w.name, w.addr));
3528 }
3529
3530 states->clear();
3531
3532 string last_read = IMAGE_KEY_PREFIX;
3533 int max_read = RBD_MAX_KEYS_READ;
3534 bool more = true;
3535 while (more) {
3536 map<string, bufferlist> vals;
3537 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
3538 max_read, &vals, &more);
3539 if (r < 0) {
3540 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3541 return r;
3542 }
3543
3544 for (auto &list_it : vals) {
3545 const string &key = list_it.first;
3546
3547 if (0 != key.compare(0, IMAGE_KEY_PREFIX.size(), IMAGE_KEY_PREFIX)) {
3548 break;
3549 }
3550
3551 cls::rbd::MirrorImage mirror_image;
3552 bufferlist::iterator iter = list_it.second.begin();
3553 try {
3554 ::decode(mirror_image, iter);
3555 } catch (const buffer::error &err) {
3556 CLS_ERR("could not decode mirror image payload for key '%s'",
3557 key.c_str());
3558 return -EIO;
3559 }
3560
3561 cls::rbd::MirrorImageStatus status;
3562 image_status_get(hctx, mirror_image.global_image_id, &status);
3563
3564 cls::rbd::MirrorImageStatusState state = status.up ? status.state :
3565 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
3566 (*states)[state]++;
3567 }
3568
3569 if (!vals.empty()) {
3570 last_read = vals.rbegin()->first;
3571 }
3572 }
3573
3574 return 0;
3575 }
3576
3577 int image_status_remove_down(cls_method_context_t hctx) {
3578 obj_list_watch_response_t watchers_;
3579 int r = cls_cxx_list_watchers(hctx, &watchers_);
3580 if (r < 0) {
3581 if (r != -ENOENT) {
3582 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3583 }
3584 return r;
3585 }
3586
3587 set<entity_inst_t> watchers;
3588 for (auto &w : watchers_.entries) {
3589 watchers.insert(entity_inst_t(w.name, w.addr));
3590 }
3591
3592 string last_read = STATUS_GLOBAL_KEY_PREFIX;
3593 int max_read = RBD_MAX_KEYS_READ;
3594 bool more = true;
3595 while (more) {
3596 map<string, bufferlist> vals;
3597 r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
3598 max_read, &vals, &more);
3599 if (r < 0) {
3600 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3601 return r;
3602 }
3603
3604 for (auto &list_it : vals) {
3605 const string &key = list_it.first;
3606
3607 if (0 != key.compare(0, STATUS_GLOBAL_KEY_PREFIX.size(),
3608 STATUS_GLOBAL_KEY_PREFIX)) {
3609 break;
3610 }
3611
3612 MirrorImageStatusOnDisk status;
3613 try {
3614 bufferlist::iterator it = list_it.second.begin();
3615 status.decode_meta(it);
3616 } catch (const buffer::error &err) {
3617 CLS_ERR("could not decode status metadata for mirrored image '%s'",
3618 key.c_str());
3619 return -EIO;
3620 }
3621
3622 if (watchers.find(status.origin) == watchers.end()) {
3623 CLS_LOG(20, "removing stale status object for key %s",
3624 key.c_str());
3625 int r1 = cls_cxx_map_remove_key(hctx, key);
3626 if (r1 < 0) {
3627 CLS_ERR("error removing stale status for key '%s': %s",
3628 key.c_str(), cpp_strerror(r1).c_str());
3629 return r1;
3630 }
3631 }
3632 }
3633
3634 if (!vals.empty()) {
3635 last_read = vals.rbegin()->first;
3636 }
3637 }
3638
3639 return 0;
3640 }
3641
3642 int instances_list(cls_method_context_t hctx,
3643 std::vector<std::string> *instance_ids) {
3644 std::string last_read = INSTANCE_KEY_PREFIX;
3645 int max_read = RBD_MAX_KEYS_READ;
3646 bool more = true;
3647 while (more) {
3648 std::map<std::string, bufferlist> vals;
3649 int r = cls_cxx_map_get_vals(hctx, last_read, INSTANCE_KEY_PREFIX.c_str(),
3650 max_read, &vals, &more);
3651 if (r < 0) {
3652 if (r != -ENOENT) {
3653 CLS_ERR("error reading mirror instances: %s", cpp_strerror(r).c_str());
3654 }
3655 return r;
3656 }
3657
3658 for (auto &it : vals) {
3659 instance_ids->push_back(it.first.substr(INSTANCE_KEY_PREFIX.size()));
3660 }
3661
3662 if (!vals.empty()) {
3663 last_read = vals.rbegin()->first;
3664 }
3665 }
3666 return 0;
3667 }
3668
3669 int instances_add(cls_method_context_t hctx, const string &instance_id) {
3670 bufferlist bl;
3671
3672 int r = cls_cxx_map_set_val(hctx, instance_key(instance_id), &bl);
3673 if (r < 0) {
3674 CLS_ERR("error setting mirror instance %s: %s", instance_id.c_str(),
3675 cpp_strerror(r).c_str());
3676 return r;
3677 }
3678 return 0;
3679 }
3680
3681 int instances_remove(cls_method_context_t hctx, const string &instance_id) {
3682
3683 int r = cls_cxx_map_remove_key(hctx, instance_key(instance_id));
3684 if (r < 0) {
3685 CLS_ERR("error removing mirror instance %s: %s", instance_id.c_str(),
3686 cpp_strerror(r).c_str());
3687 return r;
3688 }
3689 return 0;
3690 }
3691
3692 } // namespace mirror
3693
3694 /**
3695 * Input:
3696 * none
3697 *
3698 * Output:
3699 * @param uuid (std::string)
3700 * @returns 0 on success, negative error code on failure
3701 */
3702 int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
3703 bufferlist *out) {
3704 std::string mirror_uuid;
3705 int r = mirror::uuid_get(hctx, &mirror_uuid);
3706 if (r < 0) {
3707 return r;
3708 }
3709
3710 ::encode(mirror_uuid, *out);
3711 return 0;
3712 }
3713
3714 /**
3715 * Input:
3716 * @param mirror_uuid (std::string)
3717 *
3718 * Output:
3719 * @returns 0 on success, negative error code on failure
3720 */
3721 int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
3722 bufferlist *out) {
3723 std::string mirror_uuid;
3724 try {
3725 bufferlist::iterator bl_it = in->begin();
3726 ::decode(mirror_uuid, bl_it);
3727 } catch (const buffer::error &err) {
3728 return -EINVAL;
3729 }
3730
3731 if (mirror_uuid.empty()) {
3732 CLS_ERR("cannot set empty mirror uuid");
3733 return -EINVAL;
3734 }
3735
3736 uint32_t mirror_mode;
3737 int r = read_key(hctx, mirror::MODE, &mirror_mode);
3738 if (r < 0 && r != -ENOENT) {
3739 return r;
3740 } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
3741 CLS_ERR("cannot set mirror uuid while mirroring enabled");
3742 return -EINVAL;
3743 }
3744
3745 bufferlist mirror_uuid_bl;
3746 mirror_uuid_bl.append(mirror_uuid);
3747 r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
3748 if (r < 0) {
3749 CLS_ERR("failed to set mirror uuid");
3750 return r;
3751 }
3752 return 0;
3753 }
3754
3755 /**
3756 * Input:
3757 * none
3758 *
3759 * Output:
3760 * @param cls::rbd::MirrorMode (uint32_t)
3761 * @returns 0 on success, negative error code on failure
3762 */
3763 int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
3764 bufferlist *out) {
3765 uint32_t mirror_mode_decode;
3766 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3767 if (r < 0) {
3768 return r;
3769 }
3770
3771 ::encode(mirror_mode_decode, *out);
3772 return 0;
3773 }
3774
3775 /**
3776 * Input:
3777 * @param mirror_mode (cls::rbd::MirrorMode) (uint32_t)
3778 *
3779 * Output:
3780 * @returns 0 on success, negative error code on failure
3781 */
3782 int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
3783 bufferlist *out) {
3784 uint32_t mirror_mode_decode;
3785 try {
3786 bufferlist::iterator bl_it = in->begin();
3787 ::decode(mirror_mode_decode, bl_it);
3788 } catch (const buffer::error &err) {
3789 return -EINVAL;
3790 }
3791
3792 bool enabled;
3793 switch (static_cast<cls::rbd::MirrorMode>(mirror_mode_decode)) {
3794 case cls::rbd::MIRROR_MODE_DISABLED:
3795 enabled = false;
3796 break;
3797 case cls::rbd::MIRROR_MODE_IMAGE:
3798 case cls::rbd::MIRROR_MODE_POOL:
3799 enabled = true;
3800 break;
3801 default:
3802 CLS_ERR("invalid mirror mode: %d", mirror_mode_decode);
3803 return -EINVAL;
3804 }
3805
3806 int r;
3807 if (enabled) {
3808 std::string mirror_uuid;
3809 r = mirror::uuid_get(hctx, &mirror_uuid);
3810 if (r == -ENOENT) {
3811 return -EINVAL;
3812 } else if (r < 0) {
3813 return r;
3814 }
3815
3816 bufferlist bl;
3817 ::encode(mirror_mode_decode, bl);
3818
3819 r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
3820 if (r < 0) {
3821 CLS_ERR("error enabling mirroring: %s", cpp_strerror(r).c_str());
3822 return r;
3823 }
3824 } else {
3825 std::vector<cls::rbd::MirrorPeer> peers;
3826 r = mirror::read_peers(hctx, &peers);
3827 if (r < 0 && r != -ENOENT) {
3828 return r;
3829 }
3830
3831 if (!peers.empty()) {
3832 CLS_ERR("mirroring peers still registered");
3833 return -EBUSY;
3834 }
3835
3836 r = remove_key(hctx, mirror::MODE);
3837 if (r < 0) {
3838 return r;
3839 }
3840
3841 r = remove_key(hctx, mirror::UUID);
3842 if (r < 0) {
3843 return r;
3844 }
3845 }
3846 return 0;
3847 }
3848
3849 /**
3850 * Input:
3851 * none
3852 *
3853 * Output:
3854 * @param std::vector<cls::rbd::MirrorPeer>: collection of peers
3855 * @returns 0 on success, negative error code on failure
3856 */
3857 int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
3858 bufferlist *out) {
3859 std::vector<cls::rbd::MirrorPeer> peers;
3860 int r = mirror::read_peers(hctx, &peers);
3861 if (r < 0 && r != -ENOENT) {
3862 return r;
3863 }
3864
3865 ::encode(peers, *out);
3866 return 0;
3867 }
3868
3869 /**
3870 * Input:
3871 * @param mirror_peer (cls::rbd::MirrorPeer)
3872 *
3873 * Output:
3874 * @returns 0 on success, negative error code on failure
3875 */
3876 int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
3877 bufferlist *out) {
3878 cls::rbd::MirrorPeer mirror_peer;
3879 try {
3880 bufferlist::iterator it = in->begin();
3881 ::decode(mirror_peer, it);
3882 } catch (const buffer::error &err) {
3883 return -EINVAL;
3884 }
3885
3886 uint32_t mirror_mode_decode;
3887 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3888 if (r < 0 && r != -ENOENT) {
3889 return r;
3890 } else if (r == -ENOENT ||
3891 mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
3892 CLS_ERR("mirroring must be enabled on the pool");
3893 return -EINVAL;
3894 } else if (!mirror_peer.is_valid()) {
3895 CLS_ERR("mirror peer is not valid");
3896 return -EINVAL;
3897 }
3898
3899 std::string mirror_uuid;
3900 r = mirror::uuid_get(hctx, &mirror_uuid);
3901 if (r < 0) {
3902 CLS_ERR("error retrieving mirroring uuid: %s", cpp_strerror(r).c_str());
3903 return r;
3904 } else if (mirror_peer.uuid == mirror_uuid) {
3905 CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
3906 mirror_uuid.c_str());
3907 return -EINVAL;
3908 }
3909
3910 std::vector<cls::rbd::MirrorPeer> peers;
3911 r = mirror::read_peers(hctx, &peers);
3912 if (r < 0 && r != -ENOENT) {
3913 return r;
3914 }
3915
3916 for (auto const &peer : peers) {
3917 if (peer.uuid == mirror_peer.uuid) {
3918 CLS_ERR("peer uuid '%s' already exists",
3919 peer.uuid.c_str());
3920 return -ESTALE;
3921 } else if (peer.cluster_name == mirror_peer.cluster_name &&
3922 (peer.pool_id == -1 || mirror_peer.pool_id == -1 ||
3923 peer.pool_id == mirror_peer.pool_id)) {
3924 CLS_ERR("peer cluster name '%s' already exists",
3925 peer.cluster_name.c_str());
3926 return -EEXIST;
3927 }
3928 }
3929
3930 bufferlist bl;
3931 ::encode(mirror_peer, bl);
3932 r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
3933 &bl);
3934 if (r < 0) {
3935 CLS_ERR("error adding peer: %s", cpp_strerror(r).c_str());
3936 return r;
3937 }
3938 return 0;
3939 }
3940
3941 /**
3942 * Input:
3943 * @param uuid (std::string)
3944 *
3945 * Output:
3946 * @returns 0 on success, negative error code on failure
3947 */
3948 int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
3949 bufferlist *out) {
3950 std::string uuid;
3951 try {
3952 bufferlist::iterator it = in->begin();
3953 ::decode(uuid, it);
3954 } catch (const buffer::error &err) {
3955 return -EINVAL;
3956 }
3957
3958 int r = cls_cxx_map_remove_key(hctx, mirror::peer_key(uuid));
3959 if (r < 0 && r != -ENOENT) {
3960 CLS_ERR("error removing peer: %s", cpp_strerror(r).c_str());
3961 return r;
3962 }
3963 return 0;
3964 }
3965
3966 /**
3967 * Input:
3968 * @param uuid (std::string)
3969 * @param client_name (std::string)
3970 *
3971 * Output:
3972 * @returns 0 on success, negative error code on failure
3973 */
3974 int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
3975 bufferlist *out) {
3976 std::string uuid;
3977 std::string client_name;
3978 try {
3979 bufferlist::iterator it = in->begin();
3980 ::decode(uuid, it);
3981 ::decode(client_name, it);
3982 } catch (const buffer::error &err) {
3983 return -EINVAL;
3984 }
3985
3986 cls::rbd::MirrorPeer peer;
3987 int r = mirror::read_peer(hctx, uuid, &peer);
3988 if (r < 0) {
3989 return r;
3990 }
3991
3992 peer.client_name = client_name;
3993 r = mirror::write_peer(hctx, uuid, peer);
3994 if (r < 0) {
3995 return r;
3996 }
3997 return 0;
3998 }
3999
4000 /**
4001 * Input:
4002 * @param uuid (std::string)
4003 * @param cluster_name (std::string)
4004 *
4005 * Output:
4006 * @returns 0 on success, negative error code on failure
4007 */
4008 int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
4009 bufferlist *out) {
4010 std::string uuid;
4011 std::string cluster_name;
4012 try {
4013 bufferlist::iterator it = in->begin();
4014 ::decode(uuid, it);
4015 ::decode(cluster_name, it);
4016 } catch (const buffer::error &err) {
4017 return -EINVAL;
4018 }
4019
4020 cls::rbd::MirrorPeer peer;
4021 int r = mirror::read_peer(hctx, uuid, &peer);
4022 if (r < 0) {
4023 return r;
4024 }
4025
4026 peer.cluster_name = cluster_name;
4027 r = mirror::write_peer(hctx, uuid, peer);
4028 if (r < 0) {
4029 return r;
4030 }
4031 return 0;
4032 }
4033
4034 /**
4035 * Input:
4036 * @param start_after which name to begin listing after
4037 * (use the empty string to start at the beginning)
4038 * @param max_return the maximum number of names to list
4039 *
4040 * Output:
4041 * @param std::map<std::string, std::string>: local id to global id map
4042 * @returns 0 on success, negative error code on failure
4043 */
4044 int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
4045 bufferlist *out) {
4046 std::string start_after;
4047 uint64_t max_return;
4048 try {
4049 bufferlist::iterator iter = in->begin();
4050 ::decode(start_after, iter);
4051 ::decode(max_return, iter);
4052 } catch (const buffer::error &err) {
4053 return -EINVAL;
4054 }
4055
4056 int max_read = RBD_MAX_KEYS_READ;
4057 bool more = true;
4058 std::map<std::string, std::string> mirror_images;
4059 std::string last_read = mirror::image_key(start_after);
4060
4061 while (more && mirror_images.size() < max_return) {
4062 std::map<std::string, bufferlist> vals;
4063 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4064 int r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
4065 max_read, &vals, &more);
4066 if (r < 0) {
4067 CLS_ERR("error reading mirror image directory by name: %s",
4068 cpp_strerror(r).c_str());
4069 return r;
4070 }
4071
4072 for (auto it = vals.begin(); it != vals.end(); ++it) {
4073 const std::string &image_id =
4074 it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
4075 cls::rbd::MirrorImage mirror_image;
4076 bufferlist::iterator iter = it->second.begin();
4077 try {
4078 ::decode(mirror_image, iter);
4079 } catch (const buffer::error &err) {
4080 CLS_ERR("could not decode mirror image payload of image '%s'",
4081 image_id.c_str());
4082 return -EIO;
4083 }
4084
4085 mirror_images[image_id] = mirror_image.global_image_id;
4086 if (mirror_images.size() >= max_return) {
4087 break;
4088 }
4089 }
4090 if (!vals.empty()) {
4091 last_read = mirror::image_key(mirror_images.rbegin()->first);
4092 }
4093 }
4094
4095 ::encode(mirror_images, *out);
4096 return 0;
4097 }
4098
4099 /**
4100 * Input:
4101 * @param global_id (std::string)
4102 *
4103 * Output:
4104 * @param std::string - image id
4105 * @returns 0 on success, negative error code on failure
4106 */
4107 int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
4108 bufferlist *out) {
4109 std::string global_id;
4110 try {
4111 bufferlist::iterator it = in->begin();
4112 ::decode(global_id, it);
4113 } catch (const buffer::error &err) {
4114 return -EINVAL;
4115 }
4116
4117 std::string image_id;
4118 int r = read_key(hctx, mirror::global_key(global_id), &image_id);
4119 if (r < 0) {
4120 CLS_ERR("error retrieving image id for global id '%s': %s",
4121 global_id.c_str(), cpp_strerror(r).c_str());
4122 return r;
4123 }
4124
4125 ::encode(image_id, *out);
4126 return 0;
4127 }
4128
4129 /**
4130 * Input:
4131 * @param image_id (std::string)
4132 *
4133 * Output:
4134 * @param cls::rbd::MirrorImage - metadata associated with the image_id
4135 * @returns 0 on success, negative error code on failure
4136 */
4137 int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
4138 bufferlist *out) {
4139 string image_id;
4140 try {
4141 bufferlist::iterator it = in->begin();
4142 ::decode(image_id, it);
4143 } catch (const buffer::error &err) {
4144 return -EINVAL;
4145 }
4146
4147 cls::rbd::MirrorImage mirror_image;
4148 int r = mirror::image_get(hctx, image_id, &mirror_image);
4149 if (r < 0) {
4150 return r;
4151 }
4152
4153 ::encode(mirror_image, *out);
4154 return 0;
4155 }
4156
4157 /**
4158 * Input:
4159 * @param image_id (std::string)
4160 * @param mirror_image (cls::rbd::MirrorImage)
4161 *
4162 * Output:
4163 * @returns 0 on success, negative error code on failure
4164 * @returns -EEXIST if there's an existing image_id with a different global_image_id
4165 */
4166 int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
4167 bufferlist *out) {
4168 string image_id;
4169 cls::rbd::MirrorImage mirror_image;
4170 try {
4171 bufferlist::iterator it = in->begin();
4172 ::decode(image_id, it);
4173 ::decode(mirror_image, it);
4174 } catch (const buffer::error &err) {
4175 return -EINVAL;
4176 }
4177
4178 int r = mirror::image_set(hctx, image_id, mirror_image);
4179 if (r < 0) {
4180 return r;
4181 }
4182 return 0;
4183 }
4184
4185 /**
4186 * Input:
4187 * @param image_id (std::string)
4188 *
4189 * Output:
4190 * @returns 0 on success, negative error code on failure
4191 */
4192 int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
4193 bufferlist *out) {
4194 string image_id;
4195 try {
4196 bufferlist::iterator it = in->begin();
4197 ::decode(image_id, it);
4198 } catch (const buffer::error &err) {
4199 return -EINVAL;
4200 }
4201
4202 int r = mirror::image_remove(hctx, image_id);
4203 if (r < 0) {
4204 return r;
4205 }
4206 return 0;
4207 }
4208
4209 /**
4210 * Input:
4211 * @param global_image_id (std::string)
4212 * @param status (cls::rbd::MirrorImageStatus)
4213 *
4214 * Output:
4215 * @returns 0 on success, negative error code on failure
4216 */
4217 int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
4218 bufferlist *out) {
4219 string global_image_id;
4220 cls::rbd::MirrorImageStatus status;
4221 try {
4222 bufferlist::iterator it = in->begin();
4223 ::decode(global_image_id, it);
4224 ::decode(status, it);
4225 } catch (const buffer::error &err) {
4226 return -EINVAL;
4227 }
4228
4229 int r = mirror::image_status_set(hctx, global_image_id, status);
4230 if (r < 0) {
4231 return r;
4232 }
4233 return 0;
4234 }
4235
4236 /**
4237 * Input:
4238 * @param global_image_id (std::string)
4239 *
4240 * Output:
4241 * @returns 0 on success, negative error code on failure
4242 */
4243 int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
4244 bufferlist *out) {
4245 string global_image_id;
4246 try {
4247 bufferlist::iterator it = in->begin();
4248 ::decode(global_image_id, it);
4249 } catch (const buffer::error &err) {
4250 return -EINVAL;
4251 }
4252
4253 int r = mirror::image_status_remove(hctx, global_image_id);
4254 if (r < 0) {
4255 return r;
4256 }
4257 return 0;
4258 }
4259
4260 /**
4261 * Input:
4262 * @param global_image_id (std::string)
4263 *
4264 * Output:
4265 * @param cls::rbd::MirrorImageStatus - metadata associated with the global_image_id
4266 * @returns 0 on success, negative error code on failure
4267 */
4268 int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
4269 bufferlist *out) {
4270 string global_image_id;
4271 try {
4272 bufferlist::iterator it = in->begin();
4273 ::decode(global_image_id, it);
4274 } catch (const buffer::error &err) {
4275 return -EINVAL;
4276 }
4277
4278 cls::rbd::MirrorImageStatus status;
4279 int r = mirror::image_status_get(hctx, global_image_id, &status);
4280 if (r < 0) {
4281 return r;
4282 }
4283
4284 ::encode(status, *out);
4285 return 0;
4286 }
4287
4288 /**
4289 * Input:
4290 * @param start_after which name to begin listing after
4291 * (use the empty string to start at the beginning)
4292 * @param max_return the maximum number of names to list
4293 *
4294 * Output:
4295 * @param std::map<std::string, cls::rbd::MirrorImage>: image id to image map
4296 * @param std::map<std::string, cls::rbd::MirrorImageStatus>: image it to status map
4297 * @returns 0 on success, negative error code on failure
4298 */
4299 int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
4300 bufferlist *out) {
4301 std::string start_after;
4302 uint64_t max_return;
4303 try {
4304 bufferlist::iterator iter = in->begin();
4305 ::decode(start_after, iter);
4306 ::decode(max_return, iter);
4307 } catch (const buffer::error &err) {
4308 return -EINVAL;
4309 }
4310
4311 map<std::string, cls::rbd::MirrorImage> images;
4312 map<std::string, cls::rbd::MirrorImageStatus> statuses;
4313 int r = mirror::image_status_list(hctx, start_after, max_return, &images,
4314 &statuses);
4315 if (r < 0) {
4316 return r;
4317 }
4318
4319 ::encode(images, *out);
4320 ::encode(statuses, *out);
4321 return 0;
4322 }
4323
4324 /**
4325 * Input:
4326 * none
4327 *
4328 * Output:
4329 * @param std::map<cls::rbd::MirrorImageStatusState, int>: states counts
4330 * @returns 0 on success, negative error code on failure
4331 */
4332 int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
4333 bufferlist *out) {
4334 std::map<cls::rbd::MirrorImageStatusState, int> states;
4335
4336 int r = mirror::image_status_get_summary(hctx, &states);
4337 if (r < 0) {
4338 return r;
4339 }
4340
4341 ::encode(states, *out);
4342 return 0;
4343 }
4344
4345 /**
4346 * Input:
4347 * none
4348 *
4349 * Output:
4350 * @returns 0 on success, negative error code on failure
4351 */
4352 int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
4353 bufferlist *out) {
4354 int r = mirror::image_status_remove_down(hctx);
4355 if (r < 0) {
4356 return r;
4357 }
4358 return 0;
4359 }
4360
4361 /**
4362 * Input:
4363 * none
4364 *
4365 * Output:
4366 * @param std::vector<std::string>: instance ids
4367 * @returns 0 on success, negative error code on failure
4368 */
4369 int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
4370 bufferlist *out) {
4371 std::vector<std::string> instance_ids;
4372
4373 int r = mirror::instances_list(hctx, &instance_ids);
4374 if (r < 0) {
4375 return r;
4376 }
4377
4378 ::encode(instance_ids, *out);
4379 return 0;
4380 }
4381
4382 /**
4383 * Input:
4384 * @param instance_id (std::string)
4385 *
4386 * Output:
4387 * @returns 0 on success, negative error code on failure
4388 */
4389 int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
4390 bufferlist *out) {
4391 std::string instance_id;
4392 try {
4393 bufferlist::iterator iter = in->begin();
4394 ::decode(instance_id, iter);
4395 } catch (const buffer::error &err) {
4396 return -EINVAL;
4397 }
4398
4399 int r = mirror::instances_add(hctx, instance_id);
4400 if (r < 0) {
4401 return r;
4402 }
4403 return 0;
4404 }
4405
4406 /**
4407 * Input:
4408 * @param instance_id (std::string)
4409 *
4410 * Output:
4411 * @returns 0 on success, negative error code on failure
4412 */
4413 int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
4414 bufferlist *out) {
4415 std::string instance_id;
4416 try {
4417 bufferlist::iterator iter = in->begin();
4418 ::decode(instance_id, iter);
4419 } catch (const buffer::error &err) {
4420 return -EINVAL;
4421 }
4422
4423 int r = mirror::instances_remove(hctx, instance_id);
4424 if (r < 0) {
4425 return r;
4426 }
4427 return 0;
4428 }
4429
4430 /**
4431 * Initialize the header with basic metadata.
4432 * Everything is stored as key/value pairs as omaps in the header object.
4433 *
4434 * Input:
4435 * none
4436 *
4437 * Output:
4438 * @return 0 on success, negative error code on failure
4439 */
4440 int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4441 {
4442 bufferlist snap_seqbl;
4443 uint64_t snap_seq = 0;
4444 ::encode(snap_seq, snap_seqbl);
4445 int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
4446 if (r < 0)
4447 return r;
4448
4449 return 0;
4450 }
4451
4452 /**
4453 * List consistency groups from the directory.
4454 *
4455 * Input:
4456 * @param start_after (std::string)
4457 * @param max_return (int64_t)
4458 *
4459 * Output:
4460 * @param map of consistency groups (name, id)
4461 * @return 0 on success, negative error code on failure
4462 */
4463 int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4464 {
4465 string start_after;
4466 uint64_t max_return;
4467
4468 try {
4469 bufferlist::iterator iter = in->begin();
4470 ::decode(start_after, iter);
4471 ::decode(max_return, iter);
4472 } catch (const buffer::error &err) {
4473 return -EINVAL;
4474 }
4475
4476 int max_read = RBD_MAX_KEYS_READ;
4477 bool more = true;
4478 map<string, string> groups;
4479 string last_read = dir_key_for_name(start_after);
4480
4481 while (more && groups.size() < max_return) {
4482 map<string, bufferlist> vals;
4483 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4484 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
4485 max_read, &vals, &more);
4486 if (r < 0) {
4487 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
4488 return r;
4489 }
4490
4491 for (pair<string, bufferlist> val: vals) {
4492 string id;
4493 bufferlist::iterator iter = val.second.begin();
4494 try {
4495 ::decode(id, iter);
4496 } catch (const buffer::error &err) {
4497 CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
4498 return -EIO;
4499 }
4500 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
4501 groups[dir_name_from_key(val.first)] = id;
4502 if (groups.size() >= max_return)
4503 break;
4504 }
4505 if (!vals.empty()) {
4506 last_read = dir_key_for_name(groups.rbegin()->first);
4507 }
4508 }
4509
4510 ::encode(groups, *out);
4511
4512 return 0;
4513 }
4514
4515 /**
4516 * Add a consistency group to the directory.
4517 *
4518 * Input:
4519 * @param name (std::string)
4520 * @param id (std::string)
4521 *
4522 * Output:
4523 * @return 0 on success, negative error code on failure
4524 */
4525 int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4526 {
4527 int r = cls_cxx_create(hctx, false);
4528
4529 if (r < 0) {
4530 CLS_ERR("could not create consistency group directory: %s",
4531 cpp_strerror(r).c_str());
4532 return r;
4533 }
4534
4535 string name, id;
4536 try {
4537 bufferlist::iterator iter = in->begin();
4538 ::decode(name, iter);
4539 ::decode(id, iter);
4540 } catch (const buffer::error &err) {
4541 return -EINVAL;
4542 }
4543
4544 if (!name.size() || !is_valid_id(id)) {
4545 CLS_ERR("invalid consistency group name '%s' or id '%s'",
4546 name.c_str(), id.c_str());
4547 return -EINVAL;
4548 }
4549
4550 CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
4551
4552 string tmp;
4553 string name_key = dir_key_for_name(name);
4554 string id_key = dir_key_for_id(id);
4555 r = read_key(hctx, name_key, &tmp);
4556 if (r != -ENOENT) {
4557 CLS_LOG(10, "name already exists");
4558 return -EEXIST;
4559 }
4560 r = read_key(hctx, id_key, &tmp);
4561 if (r != -ENOENT) {
4562 CLS_LOG(10, "id already exists");
4563 return -EBADF;
4564 }
4565 bufferlist id_bl, name_bl;
4566 ::encode(id, id_bl);
4567 ::encode(name, name_bl);
4568 map<string, bufferlist> omap_vals;
4569 omap_vals[name_key] = id_bl;
4570 omap_vals[id_key] = name_bl;
4571 return cls_cxx_map_set_vals(hctx, &omap_vals);
4572 }
4573
4574 /**
4575 * Remove a consistency group from the directory.
4576 *
4577 * Input:
4578 * @param name (std::string)
4579 * @param id (std::string)
4580 *
4581 * Output:
4582 * @return 0 on success, negative error code on failure
4583 */
4584 int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4585 {
4586 string name, id;
4587 try {
4588 bufferlist::iterator iter = in->begin();
4589 ::decode(name, iter);
4590 ::decode(id, iter);
4591 } catch (const buffer::error &err) {
4592 return -EINVAL;
4593 }
4594
4595 CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
4596
4597 string stored_name, stored_id;
4598 string name_key = dir_key_for_name(name);
4599 string id_key = dir_key_for_id(id);
4600
4601 int r = read_key(hctx, name_key, &stored_id);
4602 if (r < 0) {
4603 if (r != -ENOENT)
4604 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
4605 return r;
4606 }
4607 r = read_key(hctx, id_key, &stored_name);
4608 if (r < 0) {
4609 if (r != -ENOENT)
4610 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
4611 return r;
4612 }
4613
4614 // check if this op raced with a rename
4615 if (stored_name != name || stored_id != id) {
4616 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
4617 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
4618 return -ESTALE;
4619 }
4620
4621 r = cls_cxx_map_remove_key(hctx, name_key);
4622 if (r < 0) {
4623 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
4624 return r;
4625 }
4626
4627 r = cls_cxx_map_remove_key(hctx, id_key);
4628 if (r < 0) {
4629 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
4630 return r;
4631 }
4632
4633 return 0;
4634 }
4635
4636 /**
4637 * Set state of an image in the consistency group.
4638 *
4639 * Input:
4640 * @param image_status (cls::rbd::GroupImageStatus)
4641 *
4642 * Output:
4643 * @return 0 on success, negative error code on failure
4644 */
4645 int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4646 {
4647 CLS_LOG(20, "group_image_set");
4648
4649 cls::rbd::GroupImageStatus st;
4650 try {
4651 bufferlist::iterator iter = in->begin();
4652 ::decode(st, iter);
4653 } catch (const buffer::error &err) {
4654 return -EINVAL;
4655 }
4656
4657 string image_key = st.spec.image_key();
4658
4659 bufferlist image_val_bl;
4660 ::encode(st.state, image_val_bl);
4661 int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
4662 if (r < 0) {
4663 return r;
4664 }
4665
4666 return 0;
4667 }
4668
4669 /**
4670 * Remove reference to an image from the consistency group.
4671 *
4672 * Input:
4673 * @param spec (cls::rbd::GroupImageSpec)
4674 *
4675 * Output:
4676 * @return 0 on success, negative error code on failure
4677 */
4678 int group_image_remove(cls_method_context_t hctx,
4679 bufferlist *in, bufferlist *out)
4680 {
4681 CLS_LOG(20, "group_image_remove");
4682 cls::rbd::GroupImageSpec spec;
4683 try {
4684 bufferlist::iterator iter = in->begin();
4685 ::decode(spec, iter);
4686 } catch (const buffer::error &err) {
4687 return -EINVAL;
4688 }
4689
4690 string image_key = spec.image_key();
4691
4692 int r = cls_cxx_map_remove_key(hctx, image_key);
4693 if (r < 0) {
4694 CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
4695 return r;
4696 }
4697
4698 return 0;
4699 }
4700
4701 /*
4702 * List images in the consistency group.
4703 *
4704 * Input:
4705 * @param start_after which name to begin listing after
4706 * (use the empty string to start at the beginning)
4707 * @param max_return the maximum number of names to list
4708 *
4709 * Output:
4710 * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
4711 * @return 0 on success, negative error code on failure
4712 */
4713 int group_image_list(cls_method_context_t hctx,
4714 bufferlist *in, bufferlist *out)
4715 {
4716 CLS_LOG(20, "group_image_list");
4717 cls::rbd::GroupImageSpec start_after;
4718 uint64_t max_return;
4719 try {
4720 bufferlist::iterator iter = in->begin();
4721 ::decode(start_after, iter);
4722 ::decode(max_return, iter);
4723 } catch (const buffer::error &err) {
4724 return -EINVAL;
4725 }
4726
4727 int max_read = RBD_MAX_KEYS_READ;
4728 std::map<string, bufferlist> vals;
4729 string last_read = start_after.image_key();
4730 std::vector<cls::rbd::GroupImageStatus> res;
4731 bool more;
4732 do {
4733 int r = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
4734 max_read, &vals, &more);
4735 if (r < 0)
4736 return r;
4737
4738 for (map<string, bufferlist>::iterator it = vals.begin();
4739 it != vals.end() && res.size() < max_return; ++it) {
4740
4741 bufferlist::iterator iter = it->second.begin();
4742 cls::rbd::GroupImageLinkState state;
4743 try {
4744 ::decode(state, iter);
4745 } catch (const buffer::error &err) {
4746 CLS_ERR("error decoding state for image: %s", it->first.c_str());
4747 return -EIO;
4748 }
4749 cls::rbd::GroupImageSpec spec;
4750 int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
4751 if (r < 0)
4752 return r;
4753
4754 CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
4755 spec.pool_id,
4756 (int)state);
4757 res.push_back(cls::rbd::GroupImageStatus(spec, state));
4758 }
4759 if (res.size() > 0) {
4760 last_read = res.rbegin()->spec.image_key();
4761 }
4762
4763 } while (more && (res.size() < max_return));
4764 ::encode(res, *out);
4765
4766 return 0;
4767 }
4768
4769 /**
4770 * Reference the consistency group this image belongs to.
4771 *
4772 * Input:
4773 * @param group_id (std::string)
4774 * @param pool_id (int64_t)
4775 *
4776 * Output:
4777 * @return 0 on success, negative error code on failure
4778 */
4779 int image_add_group(cls_method_context_t hctx,
4780 bufferlist *in, bufferlist *out)
4781 {
4782 CLS_LOG(20, "image_add_group");
4783 cls::rbd::GroupSpec new_group;
4784 try {
4785 bufferlist::iterator iter = in->begin();
4786 ::decode(new_group, iter);
4787 } catch (const buffer::error &err) {
4788 return -EINVAL;
4789 }
4790
4791 bufferlist existing_refbl;
4792
4793 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
4794 if (r == 0) {
4795 // If we are trying to link this image to the same group then return success.
4796 // If this image already belongs to another group then abort.
4797 cls::rbd::GroupSpec old_group;
4798 try {
4799 bufferlist::iterator iter = existing_refbl.begin();
4800 ::decode(old_group, iter);
4801 } catch (const buffer::error &err) {
4802 return -EINVAL;
4803 }
4804
4805 if ((old_group.group_id != new_group.group_id)
4806 || (old_group.pool_id != new_group.pool_id)) {
4807 return -EEXIST;
4808 } else {
4809 return 0; // In this case the values are already correct
4810 }
4811 } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
4812 return r;
4813 }
4814
4815 bufferlist refbl;
4816 ::encode(new_group, refbl);
4817 r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
4818
4819 if (r < 0) {
4820 return r;
4821 }
4822
4823 return 0;
4824 }
4825
4826 /**
4827 * Remove image's pointer to the consistency group.
4828 *
4829 * Input:
4830 * @param cg_id (std::string)
4831 * @param pool_id (int64_t)
4832 *
4833 * Output:
4834 * @return 0 on success, negative error code on failure
4835 */
4836 int image_remove_group(cls_method_context_t hctx,
4837 bufferlist *in,
4838 bufferlist *out)
4839 {
4840 CLS_LOG(20, "image_remove_group");
4841 cls::rbd::GroupSpec spec;
4842 try {
4843 bufferlist::iterator iter = in->begin();
4844 ::decode(spec, iter);
4845 } catch (const buffer::error &err) {
4846 return -EINVAL;
4847 }
4848
4849 bufferlist refbl;
4850 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4851 if (r < 0) {
4852 return r;
4853 }
4854
4855 cls::rbd::GroupSpec ref_spec;
4856 bufferlist::iterator iter = refbl.begin();
4857 try {
4858 ::decode(ref_spec, iter);
4859 } catch (const buffer::error &err) {
4860 return -EINVAL;
4861 }
4862
4863 if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
4864 return -EBADF;
4865 }
4866
4867 r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
4868 if (r < 0) {
4869 return r;
4870 }
4871
4872 return 0;
4873 }
4874
4875 /**
4876 * Retrieve the id and pool of the consistency group this image belongs to.
4877 *
4878 * Input:
4879 * none
4880 *
4881 * Output:
4882 * @param GroupSpec
4883 * @return 0 on success, negative error code on failure
4884 */
4885 int image_get_group(cls_method_context_t hctx,
4886 bufferlist *in, bufferlist *out)
4887 {
4888 CLS_LOG(20, "image_get_group");
4889 bufferlist refbl;
4890 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4891 if (r < 0 && r != -ENOENT) {
4892 return r;
4893 }
4894
4895 cls::rbd::GroupSpec spec;
4896
4897 if (r != -ENOENT) {
4898 bufferlist::iterator iter = refbl.begin();
4899 try {
4900 ::decode(spec, iter);
4901 } catch (const buffer::error &err) {
4902 return -EINVAL;
4903 }
4904 }
4905
4906 ::encode(spec, *out);
4907 return 0;
4908 }
4909
4910 namespace trash {
4911
4912 static const std::string IMAGE_KEY_PREFIX("id_");
4913
4914 std::string image_key(const std::string &image_id) {
4915 return IMAGE_KEY_PREFIX + image_id;
4916 }
4917
4918 std::string image_id_from_key(const std::string &key) {
4919 return key.substr(IMAGE_KEY_PREFIX.size());
4920 }
4921
4922 } // namespace trash
4923
4924 /**
4925 * Add an image entry to the rbd trash. Creates the trash object if
4926 * needed, and stores the trash spec information of the deleted image.
4927 *
4928 * Input:
4929 * @param id the id of the image
4930 * @param trash_spec the spec info of the deleted image
4931 *
4932 * Output:
4933 * @returns -EEXIST if the image id is already in the trash
4934 * @returns 0 on success, negative error code on failure
4935 */
4936 int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4937 {
4938 int r = cls_cxx_create(hctx, false);
4939 if (r < 0) {
4940 CLS_ERR("could not create trash: %s", cpp_strerror(r).c_str());
4941 return r;
4942 }
4943
4944 string id;
4945 cls::rbd::TrashImageSpec trash_spec;
4946 try {
4947 bufferlist::iterator iter = in->begin();
4948 ::decode(id, iter);
4949 ::decode(trash_spec, iter);
4950 } catch (const buffer::error &err) {
4951 return -EINVAL;
4952 }
4953
4954 if (!is_valid_id(id)) {
4955 CLS_ERR("trash_add: invalid id '%s'", id.c_str());
4956 return -EINVAL;
4957 }
4958
4959 CLS_LOG(20, "trash_add id=%s", id.c_str());
4960
4961 string key = trash::image_key(id);
4962 cls::rbd::TrashImageSpec tmp;
4963 r = read_key(hctx, key, &tmp);
4964 if (r < 0 && r != -ENOENT) {
4965 CLS_ERR("could not read key %s entry from trash: %s", key.c_str(),
4966 cpp_strerror(r).c_str());
4967 return r;
4968 } else if (r == 0) {
4969 CLS_LOG(10, "id already exists");
4970 return -EEXIST;
4971 }
4972
4973 map<string, bufferlist> omap_vals;
4974 ::encode(trash_spec, omap_vals[key]);
4975 return cls_cxx_map_set_vals(hctx, &omap_vals);
4976 }
4977
4978 /**
4979 * Removes an image entry from the rbd trash object.
4980 * image.
4981 *
4982 * Input:
4983 * @param id the id of the image
4984 *
4985 * Output:
4986 * @returns -ENOENT if the image id does not exist in the trash
4987 * @returns 0 on success, negative error code on failure
4988 */
4989 int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4990 {
4991 string id;
4992 try {
4993 bufferlist::iterator iter = in->begin();
4994 ::decode(id, iter);
4995 } catch (const buffer::error &err) {
4996 return -EINVAL;
4997 }
4998
4999 CLS_LOG(20, "trash_remove id=%s", id.c_str());
5000
5001 string key = trash::image_key(id);
5002 bufferlist tmp;
5003 int r = cls_cxx_map_get_val(hctx, key, &tmp);
5004 if (r < 0) {
5005 if (r != -ENOENT) {
5006 CLS_ERR("error reading entry key %s: %s", key.c_str(), cpp_strerror(r).c_str());
5007 }
5008 return r;
5009 }
5010
5011 r = cls_cxx_map_remove_key(hctx, key);
5012 if (r < 0) {
5013 CLS_ERR("error removing entry: %s", cpp_strerror(r).c_str());
5014 return r;
5015 }
5016
5017 return 0;
5018 }
5019
5020 /**
5021 * Returns the list of trash spec entries registered in the rbd_trash
5022 * object.
5023 *
5024 * Input:
5025 * @param start_after which name to begin listing after
5026 * (use the empty string to start at the beginning)
5027 * @param max_return the maximum number of names to list
5028 *
5029 * Output:
5030 * @param data the map between image id and trash spec info
5031 *
5032 * @returns 0 on success, negative error code on failure
5033 */
5034 int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5035 {
5036 string start_after;
5037 uint64_t max_return;
5038
5039 try {
5040 bufferlist::iterator iter = in->begin();
5041 ::decode(start_after, iter);
5042 ::decode(max_return, iter);
5043 } catch (const buffer::error &err) {
5044 return -EINVAL;
5045 }
5046
5047 map<string, cls::rbd::TrashImageSpec> data;
5048 string last_read = trash::image_key(start_after);
5049 bool more = true;
5050
5051 CLS_LOG(20, "trash_get_images");
5052 while (data.size() < max_return) {
5053 map<string, bufferlist> raw_data;
5054 int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
5055 max_return - data.size());
5056 int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
5057 max_read, &raw_data, &more);
5058 if (r < 0) {
5059 CLS_ERR("failed to read the vals off of disk: %s",
5060 cpp_strerror(r).c_str());
5061 return r;
5062 }
5063 if (raw_data.empty()) {
5064 break;
5065 }
5066
5067 map<string, bufferlist>::iterator it = raw_data.begin();
5068 for (; it != raw_data.end(); ++it) {
5069 ::decode(data[trash::image_id_from_key(it->first)], it->second);
5070 }
5071
5072 if (!more) {
5073 break;
5074 }
5075
5076 last_read = raw_data.rbegin()->first;
5077 }
5078
5079 ::encode(data, *out);
5080 return 0;
5081 }
5082
5083 /**
5084 * Returns the trash spec entry of an image registered in the rbd_trash
5085 * object.
5086 *
5087 * Input:
5088 * @param id the id of the image
5089 *
5090 * Output:
5091 * @param out the trash spec entry
5092 *
5093 * @returns 0 on success, negative error code on failure
5094 */
5095 int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5096 {
5097 string id;
5098 try {
5099 bufferlist::iterator iter = in->begin();
5100 ::decode(id, iter);
5101 } catch (const buffer::error &err) {
5102 return -EINVAL;
5103 }
5104
5105 CLS_LOG(20, "trash_get_image id=%s", id.c_str());
5106
5107
5108 string key = trash::image_key(id);
5109 bufferlist bl;
5110 int r = cls_cxx_map_get_val(hctx, key, out);
5111 if (r != -ENOENT) {
5112 CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
5113 cpp_strerror(r).c_str());
5114 }
5115 return r;
5116 }
5117
5118 CLS_INIT(rbd)
5119 {
5120 CLS_LOG(20, "Loaded rbd class!");
5121
5122 cls_handle_t h_class;
5123 cls_method_handle_t h_create;
5124 cls_method_handle_t h_get_features;
5125 cls_method_handle_t h_set_features;
5126 cls_method_handle_t h_get_size;
5127 cls_method_handle_t h_set_size;
5128 cls_method_handle_t h_get_parent;
5129 cls_method_handle_t h_set_parent;
5130 cls_method_handle_t h_get_protection_status;
5131 cls_method_handle_t h_set_protection_status;
5132 cls_method_handle_t h_get_stripe_unit_count;
5133 cls_method_handle_t h_set_stripe_unit_count;
5134 cls_method_handle_t h_get_create_timestamp;
5135 cls_method_handle_t h_get_flags;
5136 cls_method_handle_t h_set_flags;
5137 cls_method_handle_t h_remove_parent;
5138 cls_method_handle_t h_add_child;
5139 cls_method_handle_t h_remove_child;
5140 cls_method_handle_t h_get_children;
5141 cls_method_handle_t h_get_snapcontext;
5142 cls_method_handle_t h_get_object_prefix;
5143 cls_method_handle_t h_get_data_pool;
5144 cls_method_handle_t h_get_snapshot_name;
5145 cls_method_handle_t h_get_snapshot_namespace;
5146 cls_method_handle_t h_get_snapshot_timestamp;
5147 cls_method_handle_t h_snapshot_add;
5148 cls_method_handle_t h_snapshot_remove;
5149 cls_method_handle_t h_snapshot_rename;
5150 cls_method_handle_t h_get_all_features;
5151 cls_method_handle_t h_copyup;
5152 cls_method_handle_t h_get_id;
5153 cls_method_handle_t h_set_id;
5154 cls_method_handle_t h_dir_get_id;
5155 cls_method_handle_t h_dir_get_name;
5156 cls_method_handle_t h_dir_list;
5157 cls_method_handle_t h_dir_add_image;
5158 cls_method_handle_t h_dir_remove_image;
5159 cls_method_handle_t h_dir_rename_image;
5160 cls_method_handle_t h_object_map_load;
5161 cls_method_handle_t h_object_map_save;
5162 cls_method_handle_t h_object_map_resize;
5163 cls_method_handle_t h_object_map_update;
5164 cls_method_handle_t h_object_map_snap_add;
5165 cls_method_handle_t h_object_map_snap_remove;
5166 cls_method_handle_t h_metadata_set;
5167 cls_method_handle_t h_metadata_remove;
5168 cls_method_handle_t h_metadata_list;
5169 cls_method_handle_t h_metadata_get;
5170 cls_method_handle_t h_snapshot_get_limit;
5171 cls_method_handle_t h_snapshot_set_limit;
5172 cls_method_handle_t h_old_snapshots_list;
5173 cls_method_handle_t h_old_snapshot_add;
5174 cls_method_handle_t h_old_snapshot_remove;
5175 cls_method_handle_t h_old_snapshot_rename;
5176 cls_method_handle_t h_mirror_uuid_get;
5177 cls_method_handle_t h_mirror_uuid_set;
5178 cls_method_handle_t h_mirror_mode_get;
5179 cls_method_handle_t h_mirror_mode_set;
5180 cls_method_handle_t h_mirror_peer_list;
5181 cls_method_handle_t h_mirror_peer_add;
5182 cls_method_handle_t h_mirror_peer_remove;
5183 cls_method_handle_t h_mirror_peer_set_client;
5184 cls_method_handle_t h_mirror_peer_set_cluster;
5185 cls_method_handle_t h_mirror_image_list;
5186 cls_method_handle_t h_mirror_image_get_image_id;
5187 cls_method_handle_t h_mirror_image_get;
5188 cls_method_handle_t h_mirror_image_set;
5189 cls_method_handle_t h_mirror_image_remove;
5190 cls_method_handle_t h_mirror_image_status_set;
5191 cls_method_handle_t h_mirror_image_status_remove;
5192 cls_method_handle_t h_mirror_image_status_get;
5193 cls_method_handle_t h_mirror_image_status_list;
5194 cls_method_handle_t h_mirror_image_status_get_summary;
5195 cls_method_handle_t h_mirror_image_status_remove_down;
5196 cls_method_handle_t h_mirror_instances_list;
5197 cls_method_handle_t h_mirror_instances_add;
5198 cls_method_handle_t h_mirror_instances_remove;
5199 cls_method_handle_t h_group_create;
5200 cls_method_handle_t h_group_dir_list;
5201 cls_method_handle_t h_group_dir_add;
5202 cls_method_handle_t h_group_dir_remove;
5203 cls_method_handle_t h_group_image_remove;
5204 cls_method_handle_t h_group_image_list;
5205 cls_method_handle_t h_group_image_set;
5206 cls_method_handle_t h_image_add_group;
5207 cls_method_handle_t h_image_remove_group;
5208 cls_method_handle_t h_image_get_group;
5209 cls_method_handle_t h_trash_add;
5210 cls_method_handle_t h_trash_remove;
5211 cls_method_handle_t h_trash_list;
5212 cls_method_handle_t h_trash_get;
5213
5214 cls_register("rbd", &h_class);
5215 cls_register_cxx_method(h_class, "create",
5216 CLS_METHOD_RD | CLS_METHOD_WR,
5217 create, &h_create);
5218 cls_register_cxx_method(h_class, "get_features",
5219 CLS_METHOD_RD,
5220 get_features, &h_get_features);
5221 cls_register_cxx_method(h_class, "set_features",
5222 CLS_METHOD_RD | CLS_METHOD_WR,
5223 set_features, &h_set_features);
5224 cls_register_cxx_method(h_class, "get_size",
5225 CLS_METHOD_RD,
5226 get_size, &h_get_size);
5227 cls_register_cxx_method(h_class, "set_size",
5228 CLS_METHOD_RD | CLS_METHOD_WR,
5229 set_size, &h_set_size);
5230 cls_register_cxx_method(h_class, "get_snapcontext",
5231 CLS_METHOD_RD,
5232 get_snapcontext, &h_get_snapcontext);
5233 cls_register_cxx_method(h_class, "get_object_prefix",
5234 CLS_METHOD_RD,
5235 get_object_prefix, &h_get_object_prefix);
5236 cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
5237 get_data_pool, &h_get_data_pool);
5238 cls_register_cxx_method(h_class, "get_snapshot_name",
5239 CLS_METHOD_RD,
5240 get_snapshot_name, &h_get_snapshot_name);
5241 cls_register_cxx_method(h_class, "get_snapshot_namespace",
5242 CLS_METHOD_RD,
5243 get_snapshot_namespace, &h_get_snapshot_namespace);
5244 cls_register_cxx_method(h_class, "get_snapshot_timestamp",
5245 CLS_METHOD_RD,
5246 get_snapshot_timestamp, &h_get_snapshot_timestamp);
5247 cls_register_cxx_method(h_class, "snapshot_add",
5248 CLS_METHOD_RD | CLS_METHOD_WR,
5249 snapshot_add, &h_snapshot_add);
5250 cls_register_cxx_method(h_class, "snapshot_remove",
5251 CLS_METHOD_RD | CLS_METHOD_WR,
5252 snapshot_remove, &h_snapshot_remove);
5253 cls_register_cxx_method(h_class, "snapshot_rename",
5254 CLS_METHOD_RD | CLS_METHOD_WR,
5255 snapshot_rename, &h_snapshot_rename);
5256 cls_register_cxx_method(h_class, "get_all_features",
5257 CLS_METHOD_RD,
5258 get_all_features, &h_get_all_features);
5259 cls_register_cxx_method(h_class, "copyup",
5260 CLS_METHOD_RD | CLS_METHOD_WR,
5261 copyup, &h_copyup);
5262 cls_register_cxx_method(h_class, "get_parent",
5263 CLS_METHOD_RD,
5264 get_parent, &h_get_parent);
5265 cls_register_cxx_method(h_class, "set_parent",
5266 CLS_METHOD_RD | CLS_METHOD_WR,
5267 set_parent, &h_set_parent);
5268 cls_register_cxx_method(h_class, "remove_parent",
5269 CLS_METHOD_RD | CLS_METHOD_WR,
5270 remove_parent, &h_remove_parent);
5271 cls_register_cxx_method(h_class, "set_protection_status",
5272 CLS_METHOD_RD | CLS_METHOD_WR,
5273 set_protection_status, &h_set_protection_status);
5274 cls_register_cxx_method(h_class, "get_protection_status",
5275 CLS_METHOD_RD,
5276 get_protection_status, &h_get_protection_status);
5277 cls_register_cxx_method(h_class, "get_stripe_unit_count",
5278 CLS_METHOD_RD,
5279 get_stripe_unit_count, &h_get_stripe_unit_count);
5280 cls_register_cxx_method(h_class, "set_stripe_unit_count",
5281 CLS_METHOD_RD | CLS_METHOD_WR,
5282 set_stripe_unit_count, &h_set_stripe_unit_count);
5283 cls_register_cxx_method(h_class, "get_create_timestamp",
5284 CLS_METHOD_RD,
5285 get_create_timestamp, &h_get_create_timestamp);
5286 cls_register_cxx_method(h_class, "get_flags",
5287 CLS_METHOD_RD,
5288 get_flags, &h_get_flags);
5289 cls_register_cxx_method(h_class, "set_flags",
5290 CLS_METHOD_RD | CLS_METHOD_WR,
5291 set_flags, &h_set_flags);
5292 cls_register_cxx_method(h_class, "metadata_list",
5293 CLS_METHOD_RD,
5294 metadata_list, &h_metadata_list);
5295 cls_register_cxx_method(h_class, "metadata_set",
5296 CLS_METHOD_RD | CLS_METHOD_WR,
5297 metadata_set, &h_metadata_set);
5298 cls_register_cxx_method(h_class, "metadata_remove",
5299 CLS_METHOD_RD | CLS_METHOD_WR,
5300 metadata_remove, &h_metadata_remove);
5301 cls_register_cxx_method(h_class, "metadata_get",
5302 CLS_METHOD_RD,
5303 metadata_get, &h_metadata_get);
5304 cls_register_cxx_method(h_class, "snapshot_get_limit",
5305 CLS_METHOD_RD,
5306 snapshot_get_limit, &h_snapshot_get_limit);
5307 cls_register_cxx_method(h_class, "snapshot_set_limit",
5308 CLS_METHOD_WR,
5309 snapshot_set_limit, &h_snapshot_set_limit);
5310
5311 /* methods for the rbd_children object */
5312 cls_register_cxx_method(h_class, "add_child",
5313 CLS_METHOD_RD | CLS_METHOD_WR,
5314 add_child, &h_add_child);
5315 cls_register_cxx_method(h_class, "remove_child",
5316 CLS_METHOD_RD | CLS_METHOD_WR,
5317 remove_child, &h_remove_child);
5318 cls_register_cxx_method(h_class, "get_children",
5319 CLS_METHOD_RD,
5320 get_children, &h_get_children);
5321
5322 /* methods for the rbd_id.$image_name objects */
5323 cls_register_cxx_method(h_class, "get_id",
5324 CLS_METHOD_RD,
5325 get_id, &h_get_id);
5326 cls_register_cxx_method(h_class, "set_id",
5327 CLS_METHOD_RD | CLS_METHOD_WR,
5328 set_id, &h_set_id);
5329
5330 /* methods for the rbd_directory object */
5331 cls_register_cxx_method(h_class, "dir_get_id",
5332 CLS_METHOD_RD,
5333 dir_get_id, &h_dir_get_id);
5334 cls_register_cxx_method(h_class, "dir_get_name",
5335 CLS_METHOD_RD,
5336 dir_get_name, &h_dir_get_name);
5337 cls_register_cxx_method(h_class, "dir_list",
5338 CLS_METHOD_RD,
5339 dir_list, &h_dir_list);
5340 cls_register_cxx_method(h_class, "dir_add_image",
5341 CLS_METHOD_RD | CLS_METHOD_WR,
5342 dir_add_image, &h_dir_add_image);
5343 cls_register_cxx_method(h_class, "dir_remove_image",
5344 CLS_METHOD_RD | CLS_METHOD_WR,
5345 dir_remove_image, &h_dir_remove_image);
5346 cls_register_cxx_method(h_class, "dir_rename_image",
5347 CLS_METHOD_RD | CLS_METHOD_WR,
5348 dir_rename_image, &h_dir_rename_image);
5349
5350 /* methods for the rbd_object_map.$image_id object */
5351 cls_register_cxx_method(h_class, "object_map_load",
5352 CLS_METHOD_RD,
5353 object_map_load, &h_object_map_load);
5354 cls_register_cxx_method(h_class, "object_map_save",
5355 CLS_METHOD_RD | CLS_METHOD_WR,
5356 object_map_save, &h_object_map_save);
5357 cls_register_cxx_method(h_class, "object_map_resize",
5358 CLS_METHOD_RD | CLS_METHOD_WR,
5359 object_map_resize, &h_object_map_resize);
5360 cls_register_cxx_method(h_class, "object_map_update",
5361 CLS_METHOD_RD | CLS_METHOD_WR,
5362 object_map_update, &h_object_map_update);
5363 cls_register_cxx_method(h_class, "object_map_snap_add",
5364 CLS_METHOD_RD | CLS_METHOD_WR,
5365 object_map_snap_add, &h_object_map_snap_add);
5366 cls_register_cxx_method(h_class, "object_map_snap_remove",
5367 CLS_METHOD_RD | CLS_METHOD_WR,
5368 object_map_snap_remove, &h_object_map_snap_remove);
5369
5370 /* methods for the old format */
5371 cls_register_cxx_method(h_class, "snap_list",
5372 CLS_METHOD_RD,
5373 old_snapshots_list, &h_old_snapshots_list);
5374 cls_register_cxx_method(h_class, "snap_add",
5375 CLS_METHOD_RD | CLS_METHOD_WR,
5376 old_snapshot_add, &h_old_snapshot_add);
5377 cls_register_cxx_method(h_class, "snap_remove",
5378 CLS_METHOD_RD | CLS_METHOD_WR,
5379 old_snapshot_remove, &h_old_snapshot_remove);
5380 cls_register_cxx_method(h_class, "snap_rename",
5381 CLS_METHOD_RD | CLS_METHOD_WR,
5382 old_snapshot_rename, &h_old_snapshot_rename);
5383
5384 /* methods for the rbd_mirroring object */
5385 cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
5386 mirror_uuid_get, &h_mirror_uuid_get);
5387 cls_register_cxx_method(h_class, "mirror_uuid_set",
5388 CLS_METHOD_RD | CLS_METHOD_WR,
5389 mirror_uuid_set, &h_mirror_uuid_set);
5390 cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
5391 mirror_mode_get, &h_mirror_mode_get);
5392 cls_register_cxx_method(h_class, "mirror_mode_set",
5393 CLS_METHOD_RD | CLS_METHOD_WR,
5394 mirror_mode_set, &h_mirror_mode_set);
5395 cls_register_cxx_method(h_class, "mirror_peer_list", CLS_METHOD_RD,
5396 mirror_peer_list, &h_mirror_peer_list);
5397 cls_register_cxx_method(h_class, "mirror_peer_add",
5398 CLS_METHOD_RD | CLS_METHOD_WR,
5399 mirror_peer_add, &h_mirror_peer_add);
5400 cls_register_cxx_method(h_class, "mirror_peer_remove",
5401 CLS_METHOD_RD | CLS_METHOD_WR,
5402 mirror_peer_remove, &h_mirror_peer_remove);
5403 cls_register_cxx_method(h_class, "mirror_peer_set_client",
5404 CLS_METHOD_RD | CLS_METHOD_WR,
5405 mirror_peer_set_client, &h_mirror_peer_set_client);
5406 cls_register_cxx_method(h_class, "mirror_peer_set_cluster",
5407 CLS_METHOD_RD | CLS_METHOD_WR,
5408 mirror_peer_set_cluster, &h_mirror_peer_set_cluster);
5409 cls_register_cxx_method(h_class, "mirror_image_list", CLS_METHOD_RD,
5410 mirror_image_list, &h_mirror_image_list);
5411 cls_register_cxx_method(h_class, "mirror_image_get_image_id", CLS_METHOD_RD,
5412 mirror_image_get_image_id,
5413 &h_mirror_image_get_image_id);
5414 cls_register_cxx_method(h_class, "mirror_image_get", CLS_METHOD_RD,
5415 mirror_image_get, &h_mirror_image_get);
5416 cls_register_cxx_method(h_class, "mirror_image_set",
5417 CLS_METHOD_RD | CLS_METHOD_WR,
5418 mirror_image_set, &h_mirror_image_set);
5419 cls_register_cxx_method(h_class, "mirror_image_remove",
5420 CLS_METHOD_RD | CLS_METHOD_WR,
5421 mirror_image_remove, &h_mirror_image_remove);
5422 cls_register_cxx_method(h_class, "mirror_image_status_set",
5423 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5424 mirror_image_status_set, &h_mirror_image_status_set);
5425 cls_register_cxx_method(h_class, "mirror_image_status_remove",
5426 CLS_METHOD_RD | CLS_METHOD_WR,
5427 mirror_image_status_remove,
5428 &h_mirror_image_status_remove);
5429 cls_register_cxx_method(h_class, "mirror_image_status_get", CLS_METHOD_RD,
5430 mirror_image_status_get, &h_mirror_image_status_get);
5431 cls_register_cxx_method(h_class, "mirror_image_status_list", CLS_METHOD_RD,
5432 mirror_image_status_list,
5433 &h_mirror_image_status_list);
5434 cls_register_cxx_method(h_class, "mirror_image_status_get_summary",
5435 CLS_METHOD_RD, mirror_image_status_get_summary,
5436 &h_mirror_image_status_get_summary);
5437 cls_register_cxx_method(h_class, "mirror_image_status_remove_down",
5438 CLS_METHOD_RD | CLS_METHOD_WR,
5439 mirror_image_status_remove_down,
5440 &h_mirror_image_status_remove_down);
5441 cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
5442 mirror_instances_list, &h_mirror_instances_list);
5443 cls_register_cxx_method(h_class, "mirror_instances_add",
5444 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5445 mirror_instances_add, &h_mirror_instances_add);
5446 cls_register_cxx_method(h_class, "mirror_instances_remove",
5447 CLS_METHOD_RD | CLS_METHOD_WR,
5448 mirror_instances_remove,
5449 &h_mirror_instances_remove);
5450 /* methods for the consistency groups feature */
5451 cls_register_cxx_method(h_class, "group_create",
5452 CLS_METHOD_RD | CLS_METHOD_WR,
5453 group_create, &h_group_create);
5454 cls_register_cxx_method(h_class, "group_dir_list",
5455 CLS_METHOD_RD,
5456 group_dir_list, &h_group_dir_list);
5457 cls_register_cxx_method(h_class, "group_dir_add",
5458 CLS_METHOD_RD | CLS_METHOD_WR,
5459 group_dir_add, &h_group_dir_add);
5460 cls_register_cxx_method(h_class, "group_dir_remove",
5461 CLS_METHOD_RD | CLS_METHOD_WR,
5462 group_dir_remove, &h_group_dir_remove);
5463 cls_register_cxx_method(h_class, "group_image_remove",
5464 CLS_METHOD_RD | CLS_METHOD_WR,
5465 group_image_remove, &h_group_image_remove);
5466 cls_register_cxx_method(h_class, "group_image_list",
5467 CLS_METHOD_RD | CLS_METHOD_WR,
5468 group_image_list, &h_group_image_list);
5469 cls_register_cxx_method(h_class, "group_image_set",
5470 CLS_METHOD_RD | CLS_METHOD_WR,
5471 group_image_set, &h_group_image_set);
5472 cls_register_cxx_method(h_class, "image_add_group",
5473 CLS_METHOD_RD | CLS_METHOD_WR,
5474 image_add_group, &h_image_add_group);
5475 cls_register_cxx_method(h_class, "image_remove_group",
5476 CLS_METHOD_RD | CLS_METHOD_WR,
5477 image_remove_group, &h_image_remove_group);
5478 cls_register_cxx_method(h_class, "image_get_group",
5479 CLS_METHOD_RD,
5480 image_get_group, &h_image_get_group);
5481
5482 /* rbd_trash object methods */
5483 cls_register_cxx_method(h_class, "trash_add",
5484 CLS_METHOD_RD | CLS_METHOD_WR,
5485 trash_add, &h_trash_add);
5486 cls_register_cxx_method(h_class, "trash_remove",
5487 CLS_METHOD_RD | CLS_METHOD_WR,
5488 trash_remove, &h_trash_remove);
5489 cls_register_cxx_method(h_class, "trash_list",
5490 CLS_METHOD_RD,
5491 trash_list, &h_trash_list);
5492 cls_register_cxx_method(h_class, "trash_get",
5493 CLS_METHOD_RD,
5494 trash_get, &h_trash_get);
5495
5496 return;
5497 }