]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/rbd/cls_rbd.cc
update sources to v12.1.0
[ceph.git] / ceph / src / cls / rbd / cls_rbd.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/** \file
5 *
6 * This is an OSD class that implements methods for
7 * use with rbd.
8 *
9 * Most of these deal with the rbd header object. Methods prefixed
10 * with old_ deal with the original rbd design, in which clients read
11 * and interpreted the header object directly.
12 *
13 * The new format is meant to be opaque to clients - all their
14 * interactions with non-data objects should go through this
15 * class. The OSD class interface leaves the class to implement its
16 * own argument and payload serialization/deserialization, so for ease
17 * of implementation we use the existing ceph encoding/decoding
18 * methods. Something like json might be preferable, but the rbd
19 * kernel module has to be able to understand format as well. The
20 * datatypes exposed to the clients are strings, unsigned integers,
21 * and vectors of those types. The on-wire format can be found in
22 * src/include/encoding.h.
23 *
24 * The methods for interacting with the new format document their
25 * parameters as the client sees them - it would be silly to mention
26 * in each one that they take an input and an output bufferlist.
27 */
7c673cae
FG
28#include "include/types.h"
29
30#include <algorithm>
7c673cae 31#include <errno.h>
7c673cae 32#include <sstream>
7c673cae
FG
33
34#include "common/bit_vector.hpp"
35#include "common/errno.h"
36#include "objclass/objclass.h"
37#include "osd/osd_types.h"
38#include "include/rbd_types.h"
39#include "include/rbd/object_map_types.h"
40
41#include "cls/rbd/cls_rbd.h"
42#include "cls/rbd/cls_rbd_types.h"
43
44
45/*
46 * Object keys:
47 *
48 * <partial list>
49 *
50 * stripe_unit: size in bytes of the stripe unit. if not present,
51 * the stripe unit is assumed to match the object size (1 << order).
52 *
53 * stripe_count: number of objects to stripe over before looping back.
54 * if not present or 1, striping is disabled. this is the default.
55 *
56 */
57
58CLS_VER(2,0)
59CLS_NAME(rbd)
60
61#define RBD_MAX_KEYS_READ 64
62#define RBD_SNAP_KEY_PREFIX "snapshot_"
63#define RBD_DIR_ID_KEY_PREFIX "id_"
64#define RBD_DIR_NAME_KEY_PREFIX "name_"
65#define RBD_METADATA_KEY_PREFIX "metadata_"
66
67#define GROUP_SNAP_SEQ "snap_seq"
68
69static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
70{
71 unsigned snap_count = 0;
72 uint64_t snap_names_len = 0;
73 struct rbd_obj_header_ondisk *header;
74
75 CLS_LOG(20, "snapshots_list");
76
77 while (1) {
78 int len = sizeof(*header) +
79 snap_count * sizeof(struct rbd_obj_snap_ondisk) +
80 snap_names_len;
81
82 int rc = cls_cxx_read(hctx, 0, len, &bl);
83 if (rc < 0)
84 return rc;
85
86 if (bl.length() < sizeof(*header))
87 return -EINVAL;
88
89 header = (struct rbd_obj_header_ondisk *)bl.c_str();
90 assert(header);
91
92 if ((snap_count != header->snap_count) ||
93 (snap_names_len != header->snap_names_len)) {
94 snap_count = header->snap_count;
95 snap_names_len = header->snap_names_len;
96 bl.clear();
97 continue;
98 }
99 break;
100 }
101
102 return 0;
103}
104
105static void key_from_snap_id(snapid_t snap_id, string *out)
106{
107 ostringstream oss;
108 oss << RBD_SNAP_KEY_PREFIX
109 << std::setw(16) << std::setfill('0') << std::hex << snap_id;
110 *out = oss.str();
111}
112
113static snapid_t snap_id_from_key(const string &key)
114{
115 istringstream iss(key);
116 uint64_t id;
117 iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
118 return id;
119}
120
121template<typename T>
122static int read_key(cls_method_context_t hctx, const string &key, T *out)
123{
124 bufferlist bl;
125 int r = cls_cxx_map_get_val(hctx, key, &bl);
126 if (r < 0) {
127 if (r != -ENOENT) {
128 CLS_ERR("error reading omap key %s: %s", key.c_str(), cpp_strerror(r).c_str());
129 }
130 return r;
131 }
132
133 try {
134 bufferlist::iterator it = bl.begin();
135 ::decode(*out, it);
136 } catch (const buffer::error &err) {
137 CLS_ERR("error decoding %s", key.c_str());
138 return -EIO;
139 }
140
141 return 0;
142}
143
144static int remove_key(cls_method_context_t hctx, const string &key) {
145 int r = cls_cxx_map_remove_key(hctx, key);
146 if (r < 0 && r != -ENOENT) {
147 CLS_ERR("failed to remove key: %s", key.c_str());
148 return r;
149 }
150 return 0;
151}
152
153static bool is_valid_id(const string &id) {
154 if (!id.size())
155 return false;
156 for (size_t i = 0; i < id.size(); ++i) {
157 if (!isalnum(id[i])) {
158 return false;
159 }
160 }
161 return true;
162}
163
164/**
165 * Initialize the header with basic metadata.
166 * Extra features may initialize more fields in the future.
167 * Everything is stored as key/value pairs as omaps in the header object.
168 *
169 * If features the OSD does not understand are requested, -ENOSYS is
170 * returned.
171 *
172 * Input:
173 * @param size number of bytes in the image (uint64_t)
174 * @param order bits to shift to determine the size of data objects (uint8_t)
175 * @param features what optional things this image will use (uint64_t)
176 * @param object_prefix a prefix for all the data objects
177 * @param data_pool_id pool id where data objects is stored (int64_t)
178 *
179 * Output:
180 * @return 0 on success, negative error code on failure
181 */
182int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
183{
184 string object_prefix;
185 uint64_t features, size;
186 uint8_t order;
187 int64_t data_pool_id = -1;
188
189 try {
190 bufferlist::iterator iter = in->begin();
191 ::decode(size, iter);
192 ::decode(order, iter);
193 ::decode(features, iter);
194 ::decode(object_prefix, iter);
195 if (!iter.end()) {
196 ::decode(data_pool_id, iter);
197 }
198 } catch (const buffer::error &err) {
199 return -EINVAL;
200 }
201
202 CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
203 object_prefix.c_str(), (unsigned long long)size, order,
204 (unsigned long long)features);
205
206 if (features & ~RBD_FEATURES_ALL) {
207 return -ENOSYS;
208 }
209
210 if (!object_prefix.size()) {
211 return -EINVAL;
212 }
213
214 bufferlist stored_prefixbl;
215 int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
216 if (r != -ENOENT) {
217 CLS_ERR("reading object_prefix returned %d", r);
218 return -EEXIST;
219 }
220
221 bufferlist sizebl;
222 bufferlist orderbl;
223 bufferlist featuresbl;
224 bufferlist object_prefixbl;
225 bufferlist snap_seqbl;
31f18b77 226 bufferlist create_timestampbl;
7c673cae 227 uint64_t snap_seq = 0;
31f18b77 228 utime_t create_timestamp = ceph_clock_now();
7c673cae
FG
229 ::encode(size, sizebl);
230 ::encode(order, orderbl);
231 ::encode(features, featuresbl);
232 ::encode(object_prefix, object_prefixbl);
233 ::encode(snap_seq, snap_seqbl);
31f18b77 234 ::encode(create_timestamp, create_timestampbl);
7c673cae
FG
235
236 map<string, bufferlist> omap_vals;
237 omap_vals["size"] = sizebl;
238 omap_vals["order"] = orderbl;
239 omap_vals["features"] = featuresbl;
240 omap_vals["object_prefix"] = object_prefixbl;
241 omap_vals["snap_seq"] = snap_seqbl;
31f18b77 242 omap_vals["create_timestamp"] = create_timestampbl;
7c673cae
FG
243
244 if (features & RBD_FEATURE_DATA_POOL) {
245 if (data_pool_id == -1) {
246 CLS_ERR("data pool not provided with feature enabled");
247 return -EINVAL;
248 }
249
250 bufferlist data_pool_id_bl;
251 ::encode(data_pool_id, data_pool_id_bl);
252 omap_vals["data_pool_id"] = data_pool_id_bl;
253 } else if (data_pool_id != -1) {
254 CLS_ERR("data pool provided with feature disabled");
255 return -EINVAL;
256 }
257
258 r = cls_cxx_map_set_vals(hctx, &omap_vals);
259 if (r < 0)
260 return r;
261
262 return 0;
263}
264
265/**
266 * Input:
267 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t) (deprecated)
268 * @param read_only true if the image will be used read-only (bool)
269 *
270 * Output:
271 * @param features list of enabled features for the given snapshot (uint64_t)
272 * @param incompatible incompatible feature bits
273 * @returns 0 on success, negative error code on failure
274 */
275int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
276{
277 uint64_t snap_id;
278 bool read_only = false;
279
280 bufferlist::iterator iter = in->begin();
281 try {
282 ::decode(snap_id, iter);
283 if (!iter.end()) {
284 ::decode(read_only, iter);
285 }
286 } catch (const buffer::error &err) {
287 return -EINVAL;
288 }
289
290 CLS_LOG(20, "get_features snap_id=%" PRIu64 ", read_only=%d",
291 snap_id, read_only);
292
293 // NOTE: keep this deprecated snapshot logic to support negative
294 // test cases in older (pre-Infernalis) releases. Remove once older
295 // releases are no longer supported.
296 if (snap_id != CEPH_NOSNAP) {
297 cls_rbd_snap snap;
298 string snapshot_key;
299 key_from_snap_id(snap_id, &snapshot_key);
300 int r = read_key(hctx, snapshot_key, &snap);
301 if (r < 0) {
302 return r;
303 }
304 }
305
306 uint64_t features;
307 int r = read_key(hctx, "features", &features);
308 if (r < 0) {
309 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
310 return r;
311 }
312
313 uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
314 features & RBD_FEATURES_RW_INCOMPATIBLE);
315 ::encode(features, *out);
316 ::encode(incompatible, *out);
317 return 0;
318}
319
320/**
321 * set the image features
322 *
323 * Input:
324 * @param features image features
325 * @param mask image feature mask
326 *
327 * Output:
328 * none
329 *
330 * @returns 0 on success, negative error code upon failure
331 */
332int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
333{
334 uint64_t features;
335 uint64_t mask;
336 bufferlist::iterator iter = in->begin();
337 try {
338 ::decode(features, iter);
339 ::decode(mask, iter);
340 } catch (const buffer::error &err) {
341 return -EINVAL;
342 }
343
344 // check that features exists to make sure this is a header object
345 // that was created correctly
346 uint64_t orig_features = 0;
347 int r = read_key(hctx, "features", &orig_features);
348 if (r < 0 && r != -ENOENT) {
349 CLS_ERR("Could not read image's features off disk: %s",
350 cpp_strerror(r).c_str());
351 return r;
352 }
353
354 // newer clients might attempt to mask off features we don't support
355 mask &= RBD_FEATURES_ALL;
356
357 uint64_t enabled_features = features & mask;
358 if ((enabled_features & RBD_FEATURES_MUTABLE) != enabled_features) {
359 CLS_ERR("Attempting to enable immutable feature: %" PRIu64,
360 static_cast<uint64_t>(enabled_features & ~RBD_FEATURES_MUTABLE));
361 return -EINVAL;
362 }
363
364 uint64_t disabled_features = ~features & mask;
365 uint64_t disable_mask = (RBD_FEATURES_MUTABLE | RBD_FEATURES_DISABLE_ONLY);
366 if ((disabled_features & disable_mask) != disabled_features) {
367 CLS_ERR("Attempting to disable immutable feature: %" PRIu64,
368 enabled_features & ~disable_mask);
369 return -EINVAL;
370 }
371
372 features = (orig_features & ~mask) | (features & mask);
373 CLS_LOG(10, "set_features features=%" PRIu64 " orig_features=%" PRIu64,
374 features, orig_features);
375
376 bufferlist bl;
377 ::encode(features, bl);
378 r = cls_cxx_map_set_val(hctx, "features", &bl);
379 if (r < 0) {
380 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
381 return r;
382 }
383 return 0;
384}
385
386/**
387 * check that given feature(s) are set
388 *
389 * @param hctx context
390 * @param need features needed
391 * @return 0 if features are set, negative error (like ENOEXEC) otherwise
392 */
393int require_feature(cls_method_context_t hctx, uint64_t need)
394{
395 uint64_t features;
396 int r = read_key(hctx, "features", &features);
397 if (r == -ENOENT) // this implies it's an old-style image with no features
398 return -ENOEXEC;
399 if (r < 0)
400 return r;
401 if ((features & need) != need) {
402 CLS_LOG(10, "require_feature missing feature %llx, have %llx",
403 (unsigned long long)need, (unsigned long long)features);
404 return -ENOEXEC;
405 }
406 return 0;
407}
408
409/**
410 * Input:
411 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
412 *
413 * Output:
414 * @param order bits to shift to get the size of data objects (uint8_t)
415 * @param size size of the image in bytes for the given snapshot (uint64_t)
416 * @returns 0 on success, negative error code on failure
417 */
418int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
419{
420 uint64_t snap_id, size;
421 uint8_t order;
422
423 bufferlist::iterator iter = in->begin();
424 try {
425 ::decode(snap_id, iter);
426 } catch (const buffer::error &err) {
427 return -EINVAL;
428 }
429
430 CLS_LOG(20, "get_size snap_id=%llu", (unsigned long long)snap_id);
431
432 int r = read_key(hctx, "order", &order);
433 if (r < 0) {
434 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
435 return r;
436 }
437
438 if (snap_id == CEPH_NOSNAP) {
439 r = read_key(hctx, "size", &size);
440 if (r < 0) {
441 CLS_ERR("failed to read the image's size off of disk: %s", cpp_strerror(r).c_str());
442 return r;
443 }
444 } else {
445 cls_rbd_snap snap;
446 string snapshot_key;
447 key_from_snap_id(snap_id, &snapshot_key);
448 int r = read_key(hctx, snapshot_key, &snap);
449 if (r < 0)
450 return r;
451
452 size = snap.image_size;
453 }
454
455 ::encode(order, *out);
456 ::encode(size, *out);
457
458 return 0;
459}
460
461/**
462 * Input:
463 * @param size new capacity of the image in bytes (uint64_t)
464 *
465 * Output:
466 * @returns 0 on success, negative error code on failure
467 */
468int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
469{
470 uint64_t size;
471
472 bufferlist::iterator iter = in->begin();
473 try {
474 ::decode(size, iter);
475 } catch (const buffer::error &err) {
476 return -EINVAL;
477 }
478
479 // check that size exists to make sure this is a header object
480 // that was created correctly
481 uint64_t orig_size;
482 int r = read_key(hctx, "size", &orig_size);
483 if (r < 0) {
484 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
485 return r;
486 }
487
488 CLS_LOG(20, "set_size size=%llu orig_size=%llu", (unsigned long long)size,
489 (unsigned long long)orig_size);
490
491 bufferlist sizebl;
492 ::encode(size, sizebl);
493 r = cls_cxx_map_set_val(hctx, "size", &sizebl);
494 if (r < 0) {
495 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
496 return r;
497 }
498
499 // if we are shrinking, and have a parent, shrink our overlap with
500 // the parent, too.
501 if (size < orig_size) {
502 cls_rbd_parent parent;
503 r = read_key(hctx, "parent", &parent);
504 if (r == -ENOENT)
505 r = 0;
506 if (r < 0)
507 return r;
508 if (parent.exists() && parent.overlap > size) {
509 bufferlist parentbl;
510 parent.overlap = size;
511 ::encode(parent, parentbl);
512 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
513 if (r < 0) {
514 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
515 return r;
516 }
517 }
518 }
519
520 return 0;
521}
522
523/**
524 * verify that the header object exists
525 *
526 * @return 0 if the object exists, -ENOENT if it does not, or other error
527 */
528int check_exists(cls_method_context_t hctx)
529{
530 uint64_t size;
531 time_t mtime;
532 return cls_cxx_stat(hctx, &size, &mtime);
533}
534
535/**
536 * get the current protection status of the specified snapshot
537 *
538 * Input:
539 * @param snap_id (uint64_t) which snapshot to get the status of
540 *
541 * Output:
542 * @param status (uint8_t) one of:
543 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
544 *
545 * @returns 0 on success, negative error code on failure
546 * @returns -EINVAL if snapid is CEPH_NOSNAP
547 */
548int get_protection_status(cls_method_context_t hctx, bufferlist *in,
549 bufferlist *out)
550{
551 snapid_t snap_id;
552
553 bufferlist::iterator iter = in->begin();
554 try {
555 ::decode(snap_id, iter);
556 } catch (const buffer::error &err) {
557 CLS_LOG(20, "get_protection_status: invalid decode");
558 return -EINVAL;
559 }
560
561 int r = check_exists(hctx);
562 if (r < 0)
563 return r;
564
565 CLS_LOG(20, "get_protection_status snap_id=%llu",
566 (unsigned long long)snap_id.val);
567
568 if (snap_id == CEPH_NOSNAP)
569 return -EINVAL;
570
571 cls_rbd_snap snap;
572 string snapshot_key;
573 key_from_snap_id(snap_id.val, &snapshot_key);
574 r = read_key(hctx, snapshot_key, &snap);
575 if (r < 0) {
576 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
577 return r;
578 }
579
580 if (snap.protection_status >= RBD_PROTECTION_STATUS_LAST) {
581 CLS_ERR("invalid protection status for snap id %llu: %u",
582 (unsigned long long)snap_id.val, snap.protection_status);
583 return -EIO;
584 }
585
586 ::encode(snap.protection_status, *out);
587 return 0;
588}
589
590/**
591 * set the proctection status of a snapshot
592 *
593 * Input:
594 * @param snapid (uint64_t) which snapshot to set the status of
595 * @param status (uint8_t) one of:
596 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
597 *
598 * @returns 0 on success, negative error code on failure
599 * @returns -EINVAL if snapid is CEPH_NOSNAP
600 */
601int set_protection_status(cls_method_context_t hctx, bufferlist *in,
602 bufferlist *out)
603{
604 snapid_t snap_id;
605 uint8_t status;
606
607 bufferlist::iterator iter = in->begin();
608 try {
609 ::decode(snap_id, iter);
610 ::decode(status, iter);
611 } catch (const buffer::error &err) {
612 CLS_LOG(20, "set_protection_status: invalid decode");
613 return -EINVAL;
614 }
615
616 int r = check_exists(hctx);
617 if (r < 0)
618 return r;
619
620 r = require_feature(hctx, RBD_FEATURE_LAYERING);
621 if (r < 0) {
622 CLS_LOG(20, "image does not support layering");
623 return r;
624 }
625
626 CLS_LOG(20, "set_protection_status snapid=%llu status=%u",
627 (unsigned long long)snap_id.val, status);
628
629 if (snap_id == CEPH_NOSNAP)
630 return -EINVAL;
631
632 if (status >= RBD_PROTECTION_STATUS_LAST) {
633 CLS_LOG(10, "invalid protection status for snap id %llu: %u",
634 (unsigned long long)snap_id.val, status);
635 return -EINVAL;
636 }
637
638 cls_rbd_snap snap;
639 string snapshot_key;
640 key_from_snap_id(snap_id.val, &snapshot_key);
641 r = read_key(hctx, snapshot_key, &snap);
642 if (r < 0) {
643 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
644 return r;
645 }
646
647 snap.protection_status = status;
648 bufferlist snapshot_bl;
649 ::encode(snap, snapshot_bl);
650 r = cls_cxx_map_set_val(hctx, snapshot_key, &snapshot_bl);
651 if (r < 0) {
652 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
653 return r;
654 }
655
656 return 0;
657}
658
659/**
660 * get striping parameters
661 *
662 * Input:
663 * none
664 *
665 * Output:
666 * @param stripe unit (bytes)
667 * @param stripe count (num objects)
668 *
669 * @returns 0 on success
670 */
671int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
672{
673 int r = check_exists(hctx);
674 if (r < 0)
675 return r;
676
677 CLS_LOG(20, "get_stripe_unit_count");
678
679 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
680 if (r < 0)
681 return r;
682
683 uint64_t stripe_unit = 0, stripe_count = 0;
684 r = read_key(hctx, "stripe_unit", &stripe_unit);
685 if (r == -ENOENT) {
686 // default to object size
687 uint8_t order;
688 r = read_key(hctx, "order", &order);
689 if (r < 0) {
690 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
691 return -EIO;
692 }
693 stripe_unit = 1ull << order;
694 }
695 if (r < 0)
696 return r;
697 r = read_key(hctx, "stripe_count", &stripe_count);
698 if (r == -ENOENT) {
699 // default to 1
700 stripe_count = 1;
701 r = 0;
702 }
703 if (r < 0)
704 return r;
705
706 ::encode(stripe_unit, *out);
707 ::encode(stripe_count, *out);
708 return 0;
709}
710
711/**
712 * set striping parameters
713 *
714 * Input:
715 * @param stripe unit (bytes)
716 * @param stripe count (num objects)
717 *
718 * @returns 0 on success
719 */
720int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
721{
722 uint64_t stripe_unit, stripe_count;
723
724 bufferlist::iterator iter = in->begin();
725 try {
726 ::decode(stripe_unit, iter);
727 ::decode(stripe_count, iter);
728 } catch (const buffer::error &err) {
729 CLS_LOG(20, "set_stripe_unit_count: invalid decode");
730 return -EINVAL;
731 }
732
733 if (!stripe_count || !stripe_unit)
734 return -EINVAL;
735
736 int r = check_exists(hctx);
737 if (r < 0)
738 return r;
739
740 CLS_LOG(20, "set_stripe_unit_count");
741
742 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
743 if (r < 0)
744 return r;
745
746 uint8_t order;
747 r = read_key(hctx, "order", &order);
748 if (r < 0) {
749 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
750 return r;
751 }
752 if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
753 CLS_ERR("stripe unit %llu is not a factor of the object size %llu",
754 (unsigned long long)stripe_unit, 1ull << order);
755 return -EINVAL;
756 }
757
758 bufferlist bl, bl2;
759 ::encode(stripe_unit, bl);
760 r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
761 if (r < 0) {
762 CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
763 return r;
764 }
765
766 ::encode(stripe_count, bl2);
767 r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
768 if (r < 0) {
769 CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
770 return r;
771 }
772
773 return 0;
774}
775
31f18b77
FG
776int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
777{
778 CLS_LOG(20, "get_create_timestamp");
779
780 utime_t timestamp;
781 bufferlist bl;
782 int r = cls_cxx_map_get_val(hctx, "create_timestamp", &bl);
783 if (r < 0) {
784 if (r != -ENOENT) {
785 CLS_ERR("error reading create_timestamp: %s", cpp_strerror(r).c_str());
786 return r;
787 }
788 } else {
789 try {
790 bufferlist::iterator it = bl.begin();
791 ::decode(timestamp, it);
792 } catch (const buffer::error &err) {
793 CLS_ERR("could not decode create_timestamp");
794 return -EIO;
795 }
796 }
797
798 ::encode(timestamp, *out);
799 return 0;
800}
801
7c673cae
FG
802/**
803 * get the image flags
804 *
805 * Input:
806 * @param snap_id which snapshot to query, to CEPH_NOSNAP (uint64_t)
807 *
808 * Output:
809 * @param flags image flags
810 *
811 * @returns 0 on success, negative error code upon failure
812 */
813int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
814{
815 uint64_t snap_id;
816 bufferlist::iterator iter = in->begin();
817 try {
818 ::decode(snap_id, iter);
819 } catch (const buffer::error &err) {
820 return -EINVAL;
821 }
822
823 CLS_LOG(20, "get_flags snap_id=%llu", (unsigned long long)snap_id);
824
825 uint64_t flags = 0;
826 if (snap_id == CEPH_NOSNAP) {
827 int r = read_key(hctx, "flags", &flags);
828 if (r < 0 && r != -ENOENT) {
829 CLS_ERR("failed to read flags off disk: %s", cpp_strerror(r).c_str());
830 return r;
831 }
832 } else {
833 cls_rbd_snap snap;
834 string snapshot_key;
835 key_from_snap_id(snap_id, &snapshot_key);
836 int r = read_key(hctx, snapshot_key, &snap);
837 if (r < 0) {
838 return r;
839 }
840 flags = snap.flags;
841 }
842
843 ::encode(flags, *out);
844 return 0;
845}
846
847/**
848 * set the image flags
849 *
850 * Input:
851 * @param flags image flags
852 * @param mask image flag mask
853 * @param snap_id which snapshot to update, or CEPH_NOSNAP (uint64_t)
854 *
855 * Output:
856 * none
857 *
858 * @returns 0 on success, negative error code upon failure
859 */
860int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
861{
862 uint64_t flags;
863 uint64_t mask;
864 uint64_t snap_id = CEPH_NOSNAP;
865 bufferlist::iterator iter = in->begin();
866 try {
867 ::decode(flags, iter);
868 ::decode(mask, iter);
869 if (!iter.end()) {
870 ::decode(snap_id, iter);
871 }
872 } catch (const buffer::error &err) {
873 return -EINVAL;
874 }
875
876 // check that size exists to make sure this is a header object
877 // that was created correctly
878 int r;
879 uint64_t orig_flags = 0;
880 cls_rbd_snap snap_meta;
881 string snap_meta_key;
882 if (snap_id == CEPH_NOSNAP) {
883 r = read_key(hctx, "flags", &orig_flags);
884 if (r < 0 && r != -ENOENT) {
885 CLS_ERR("Could not read image's flags off disk: %s",
886 cpp_strerror(r).c_str());
887 return r;
888 }
889 } else {
890 key_from_snap_id(snap_id, &snap_meta_key);
891 r = read_key(hctx, snap_meta_key, &snap_meta);
892 if (r < 0) {
893 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
894 snap_id, cpp_strerror(r).c_str());
895 return r;
896 }
897 orig_flags = snap_meta.flags;
898 }
899
900 flags = (orig_flags & ~mask) | (flags & mask);
901 CLS_LOG(20, "set_flags snap_id=%" PRIu64 ", orig_flags=%" PRIu64 ", "
902 "new_flags=%" PRIu64 ", mask=%" PRIu64, snap_id, orig_flags,
903 flags, mask);
904
905 if (snap_id == CEPH_NOSNAP) {
906 bufferlist bl;
907 ::encode(flags, bl);
908 r = cls_cxx_map_set_val(hctx, "flags", &bl);
909 } else {
910 snap_meta.flags = flags;
911
912 bufferlist bl;
913 ::encode(snap_meta, bl);
914 r = cls_cxx_map_set_val(hctx, snap_meta_key, &bl);
915 }
916
917 if (r < 0) {
918 CLS_ERR("error updating flags: %s", cpp_strerror(r).c_str());
919 return r;
920 }
921 return 0;
922}
923
924/**
925 * get the current parent, if any
926 *
927 * Input:
928 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
929 *
930 * Output:
931 * @param pool parent pool id (-1 if parent does not exist)
932 * @param image parent image id
933 * @param snapid parent snapid
934 * @param size portion of parent mapped under the child
935 *
936 * @returns 0 on success or parent does not exist, negative error code on failure
937 */
938int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
939{
940 uint64_t snap_id;
941
942 bufferlist::iterator iter = in->begin();
943 try {
944 ::decode(snap_id, iter);
945 } catch (const buffer::error &err) {
946 return -EINVAL;
947 }
948
949 int r = check_exists(hctx);
950 if (r < 0)
951 return r;
952
953 CLS_LOG(20, "get_parent snap_id=%llu", (unsigned long long)snap_id);
954
955 cls_rbd_parent parent;
956 r = require_feature(hctx, RBD_FEATURE_LAYERING);
957 if (r == 0) {
958 if (snap_id == CEPH_NOSNAP) {
959 r = read_key(hctx, "parent", &parent);
960 if (r < 0 && r != -ENOENT)
961 return r;
962 } else {
963 cls_rbd_snap snap;
964 string snapshot_key;
965 key_from_snap_id(snap_id, &snapshot_key);
966 r = read_key(hctx, snapshot_key, &snap);
967 if (r < 0 && r != -ENOENT)
968 return r;
969 parent = snap.parent;
970 }
971 }
972
973 ::encode(parent.pool, *out);
974 ::encode(parent.id, *out);
975 ::encode(parent.snapid, *out);
976 ::encode(parent.overlap, *out);
977 return 0;
978}
979
980/**
981 * set the image parent
982 *
983 * Input:
984 * @param pool parent pool
985 * @param id parent image id
986 * @param snapid parent snapid
987 * @param size parent size
988 *
989 * @returns 0 on success, or negative error code
990 */
991int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
992{
993 int64_t pool;
994 string id;
995 snapid_t snapid;
996 uint64_t size;
997
998 bufferlist::iterator iter = in->begin();
999 try {
1000 ::decode(pool, iter);
1001 ::decode(id, iter);
1002 ::decode(snapid, iter);
1003 ::decode(size, iter);
1004 } catch (const buffer::error &err) {
1005 CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
1006 return -EINVAL;
1007 }
1008
1009 int r = check_exists(hctx);
1010 if (r < 0) {
1011 CLS_LOG(20, "cls_rbd::set_parent: child already exists");
1012 return r;
1013 }
1014
1015 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1016 if (r < 0) {
1017 CLS_LOG(20, "cls_rbd::set_parent: child does not support layering");
1018 return r;
1019 }
1020
1021 CLS_LOG(20, "set_parent pool=%llu id=%s snapid=%llu size=%llu",
1022 (unsigned long long)pool, id.c_str(), (unsigned long long)snapid.val,
1023 (unsigned long long)size);
1024
1025 if (pool < 0 || id.length() == 0 || snapid == CEPH_NOSNAP || size == 0) {
1026 return -EINVAL;
1027 }
1028
1029 // make sure there isn't already a parent
1030 cls_rbd_parent parent;
1031 r = read_key(hctx, "parent", &parent);
1032 if (r == 0) {
1033 CLS_LOG(20, "set_parent existing parent pool=%llu id=%s snapid=%llu"
1034 "overlap=%llu", (unsigned long long)parent.pool, parent.id.c_str(),
1035 (unsigned long long)parent.snapid.val,
1036 (unsigned long long)parent.overlap);
1037 return -EEXIST;
1038 }
1039
1040 // our overlap is the min of our size and the parent's size.
1041 uint64_t our_size;
1042 r = read_key(hctx, "size", &our_size);
1043 if (r < 0)
1044 return r;
1045
1046 bufferlist parentbl;
1047 parent.pool = pool;
1048 parent.id = id;
1049 parent.snapid = snapid;
1050 parent.overlap = MIN(our_size, size);
1051 ::encode(parent, parentbl);
1052 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
1053 if (r < 0) {
1054 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
1055 return r;
1056 }
1057
1058 return 0;
1059}
1060
1061
1062/**
1063 * remove the parent pointer
1064 *
1065 * This can only happen on the head, not on a snapshot. No arguments.
1066 *
1067 * @returns 0 on success, negative error code on failure.
1068 */
1069int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1070{
1071 int r = check_exists(hctx);
1072 if (r < 0)
1073 return r;
1074
1075 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1076 if (r < 0)
1077 return r;
1078
1079 uint64_t features;
1080 r = read_key(hctx, "features", &features);
1081 if (r < 0) {
1082 return r;
1083 }
1084
1085 // remove the parent from all snapshots
1086 if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0) {
1087 int max_read = RBD_MAX_KEYS_READ;
1088 vector<snapid_t> snap_ids;
1089 string last_read = RBD_SNAP_KEY_PREFIX;
1090
1091 do {
1092 set<string> keys;
1093 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys);
1094 if (r < 0) {
1095 return r;
1096 }
1097
1098 for (std::set<string>::const_iterator it = keys.begin();
1099 it != keys.end(); ++it) {
1100 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0) {
1101 break;
1102 }
1103
1104 uint64_t snap_id = snap_id_from_key(*it);
1105 cls_rbd_snap snap_meta;
1106 r = read_key(hctx, *it, &snap_meta);
1107 if (r < 0) {
1108 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
1109 snap_id, cpp_strerror(r).c_str());
1110 return r;
1111 }
1112
1113 snap_meta.parent = cls_rbd_parent();
1114
1115 bufferlist bl;
1116 ::encode(snap_meta, bl);
1117 r = cls_cxx_map_set_val(hctx, *it, &bl);
1118 if (r < 0) {
1119 CLS_ERR("Could not update snapshot: snap_id=%" PRIu64 ": %s",
1120 snap_id, cpp_strerror(r).c_str());
1121 return r;
1122 }
1123 }
1124
1125 if (!keys.empty()) {
1126 last_read = *(keys.rbegin());
1127 }
1128 } while (r == max_read);
1129 }
1130
1131 cls_rbd_parent parent;
1132 r = read_key(hctx, "parent", &parent);
1133 if (r < 0)
1134 return r;
1135
1136 r = cls_cxx_map_remove_key(hctx, "parent");
1137 if (r < 0) {
1138 CLS_ERR("error removing parent: %s", cpp_strerror(r).c_str());
1139 return r;
1140 }
1141 return 0;
1142}
1143
1144/**
1145 * methods for dealing with rbd_children object
1146 */
1147
1148static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
1149 string *image_id, snapid_t *snap_id)
1150{
1151 try {
1152 ::decode(*pool_id, it);
1153 ::decode(*image_id, it);
1154 ::decode(*snap_id, it);
1155 } catch (const buffer::error &err) {
1156 CLS_ERR("error decoding parent spec");
1157 return -EINVAL;
1158 }
1159 return 0;
1160}
1161
1162static int decode_parent(bufferlist *in, uint64_t *pool_id,
1163 string *image_id, snapid_t *snap_id)
1164{
1165 bufferlist::iterator it = in->begin();
1166 return decode_parent_common(it, pool_id, image_id, snap_id);
1167}
1168
1169static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
1170 string *image_id, snapid_t *snap_id,
1171 string *c_image_id)
1172{
1173 bufferlist::iterator it = in->begin();
1174 int r = decode_parent_common(it, pool_id, image_id, snap_id);
1175 if (r < 0)
1176 return r;
1177 try {
1178 ::decode(*c_image_id, it);
1179 } catch (const buffer::error &err) {
1180 CLS_ERR("error decoding child image id");
1181 return -EINVAL;
1182 }
1183 return 0;
1184}
1185
1186static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
1187{
1188 bufferlist key_bl;
1189 ::encode(pool_id, key_bl);
1190 ::encode(image_id, key_bl);
1191 ::encode(snap_id, key_bl);
1192 return string(key_bl.c_str(), key_bl.length());
1193}
1194
1195/**
1196 * add child to rbd_children directory object
1197 *
1198 * rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
1199 * [c_image_id, [c_image_id ... ]]
1200 *
1201 * Input:
1202 * @param p_pool_id parent pool id
1203 * @param p_image_id parent image oid
1204 * @param p_snap_id parent snapshot id
1205 * @param c_image_id new child image oid to add
1206 *
1207 * @returns 0 on success, negative error on failure
1208 */
1209
1210int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1211{
1212 int r;
1213
1214 uint64_t p_pool_id;
1215 snapid_t p_snap_id;
1216 string p_image_id, c_image_id;
1217 // Use set for ease of erase() for remove_child()
1218 std::set<string> children;
1219
1220 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1221 &c_image_id);
1222 if (r < 0)
1223 return r;
1224
1225 CLS_LOG(20, "add_child %s to (%" PRIu64 ", %s, %" PRIu64 ")", c_image_id.c_str(),
1226 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1227
1228 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1229
1230 // get current child list for parent, if any
1231 r = read_key(hctx, key, &children);
1232 if ((r < 0) && (r != -ENOENT)) {
1233 CLS_LOG(20, "add_child: omap read failed: %s", cpp_strerror(r).c_str());
1234 return r;
1235 }
1236
1237 if (children.find(c_image_id) != children.end()) {
1238 CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
1239 return -EEXIST;
1240 }
1241 // add new child
1242 children.insert(c_image_id);
1243
1244 // write back
1245 bufferlist childbl;
1246 ::encode(children, childbl);
1247 r = cls_cxx_map_set_val(hctx, key, &childbl);
1248 if (r < 0)
1249 CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
1250 return r;
1251}
1252
1253/**
1254 * remove child from rbd_children directory object
1255 *
1256 * Input:
1257 * @param p_pool_id parent pool id
1258 * @param p_image_id parent image oid
1259 * @param p_snap_id parent snapshot id
1260 * @param c_image_id new child image oid to add
1261 *
1262 * @returns 0 on success, negative error on failure
1263 */
1264
1265int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1266{
1267 int r;
1268
1269 uint64_t p_pool_id;
1270 snapid_t p_snap_id;
1271 string p_image_id, c_image_id;
1272 std::set<string> children;
1273
1274 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1275 &c_image_id);
1276 if (r < 0)
1277 return r;
1278
1279 CLS_LOG(20, "remove_child %s from (%" PRIu64 ", %s, %" PRIu64 ")",
1280 c_image_id.c_str(), p_pool_id, p_image_id.c_str(),
1281 p_snap_id.val);
1282
1283 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1284
1285 // get current child list for parent. Unlike add_child(), an empty list
1286 // is an error (how can we remove something that doesn't exist?)
1287 r = read_key(hctx, key, &children);
1288 if (r < 0) {
1289 CLS_LOG(20, "remove_child: read omap failed: %s", cpp_strerror(r).c_str());
1290 return r;
1291 }
1292
1293 if (children.find(c_image_id) == children.end()) {
1294 CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
1295 return -ENOENT;
1296 }
1297 // find and remove child
1298 children.erase(c_image_id);
1299
1300 // now empty? remove key altogether
1301 if (children.empty()) {
1302 r = cls_cxx_map_remove_key(hctx, key);
1303 if (r < 0)
1304 CLS_LOG(20, "remove_child: remove key failed: %s", cpp_strerror(r).c_str());
1305 } else {
1306 // write back shortened children list
1307 bufferlist childbl;
1308 ::encode(children, childbl);
1309 r = cls_cxx_map_set_val(hctx, key, &childbl);
1310 if (r < 0)
1311 CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
1312 }
1313 return r;
1314}
1315
1316/**
1317 * Input:
1318 * @param p_pool_id parent pool id
1319 * @param p_image_id parent image oid
1320 * @param p_snap_id parent snapshot id
1321 * @param c_image_id new child image oid to add
1322 *
1323 * Output:
1324 * @param children set<string> of children
1325 *
1326 * @returns 0 on success, negative error on failure
1327 */
1328int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1329{
1330 int r;
1331 uint64_t p_pool_id;
1332 snapid_t p_snap_id;
1333 string p_image_id;
1334 std::set<string> children;
1335
1336 r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
1337 if (r < 0)
1338 return r;
1339
1340 CLS_LOG(20, "get_children of (%" PRIu64 ", %s, %" PRIu64 ")",
1341 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1342
1343 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1344
1345 r = read_key(hctx, key, &children);
1346 if (r < 0) {
1347 if (r != -ENOENT)
1348 CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
1349 return r;
1350 }
1351 ::encode(children, *out);
1352 return 0;
1353}
1354
1355
1356/**
1357 * Get the information needed to create a rados snap context for doing
1358 * I/O to the data objects. This must include all snapshots.
1359 *
1360 * Output:
1361 * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
1362 * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
1363 * @returns 0 on success, negative error code on failure
1364 */
1365int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1366{
1367 CLS_LOG(20, "get_snapcontext");
1368
1369 int r;
1370 int max_read = RBD_MAX_KEYS_READ;
1371 vector<snapid_t> snap_ids;
1372 string last_read = RBD_SNAP_KEY_PREFIX;
1373
1374 do {
1375 set<string> keys;
1376 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys);
1377 if (r < 0)
1378 return r;
1379
1380 for (set<string>::const_iterator it = keys.begin();
1381 it != keys.end(); ++it) {
1382 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0)
1383 break;
1384 snapid_t snap_id = snap_id_from_key(*it);
1385 snap_ids.push_back(snap_id);
1386 }
1387 if (!keys.empty())
1388 last_read = *(keys.rbegin());
1389 } while (r == max_read);
1390
1391 uint64_t snap_seq;
1392 r = read_key(hctx, "snap_seq", &snap_seq);
1393 if (r < 0) {
1394 CLS_ERR("could not read the image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1395 return r;
1396 }
1397
1398 // snap_ids must be descending in a snap context
1399 std::reverse(snap_ids.begin(), snap_ids.end());
1400
1401 ::encode(snap_seq, *out);
1402 ::encode(snap_ids, *out);
1403
1404 return 0;
1405}
1406
1407/**
1408 * Output:
1409 * @param object_prefix prefix for data object names (string)
1410 * @returns 0 on success, negative error code on failure
1411 */
1412int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1413{
1414 CLS_LOG(20, "get_object_prefix");
1415
1416 string object_prefix;
1417 int r = read_key(hctx, "object_prefix", &object_prefix);
1418 if (r < 0) {
1419 CLS_ERR("failed to read the image's object prefix off of disk: %s",
1420 cpp_strerror(r).c_str());
1421 return r;
1422 }
1423
1424 ::encode(object_prefix, *out);
1425
1426 return 0;
1427}
1428
1429/**
1430 * Input:
1431 * none
1432 *
1433 * Output:
1434 * @param pool_id (int64_t) of data pool or -1 if none
1435 * @returns 0 on success, negative error code on failure
1436 */
1437int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1438{
1439 CLS_LOG(20, "get_data_pool");
1440
1441 int64_t data_pool_id = -1;
1442 int r = read_key(hctx, "data_pool_id", &data_pool_id);
1443 if (r == -ENOENT) {
1444 data_pool_id = -1;
1445 } else if (r < 0) {
1446 CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
1447 return r;
1448 }
1449
1450 ::encode(data_pool_id, *out);
1451 return 0;
1452}
1453
1454int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1455{
1456 uint64_t snap_id;
1457
1458 bufferlist::iterator iter = in->begin();
1459 try {
1460 ::decode(snap_id, iter);
1461 } catch (const buffer::error &err) {
1462 return -EINVAL;
1463 }
1464
1465 CLS_LOG(20, "get_snapshot_name snap_id=%llu", (unsigned long long)snap_id);
1466
1467 if (snap_id == CEPH_NOSNAP)
1468 return -EINVAL;
1469
1470 cls_rbd_snap snap;
1471 string snapshot_key;
1472 key_from_snap_id(snap_id, &snapshot_key);
1473 int r = read_key(hctx, snapshot_key, &snap);
1474 if (r < 0)
1475 return r;
1476
1477 ::encode(snap.name, *out);
1478
1479 return 0;
1480}
1481
1482int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1483{
1484 uint64_t snap_id;
1485
1486 bufferlist::iterator iter = in->begin();
1487 try {
1488 ::decode(snap_id, iter);
1489 } catch (const buffer::error &err) {
1490 return -EINVAL;
1491 }
1492
1493 CLS_LOG(20, "get_snapshot_timestamp snap_id=%llu", (unsigned long long)snap_id);
1494
1495 if (snap_id == CEPH_NOSNAP) {
1496 return -EINVAL;
1497 }
1498
1499 cls_rbd_snap snap;
1500 string snapshot_key;
1501 key_from_snap_id(snap_id, &snapshot_key);
1502 int r = read_key(hctx, snapshot_key, &snap);
1503 if (r < 0) {
1504 return r;
1505 }
1506
1507 ::encode(snap.timestamp, *out);
1508 return 0;
1509}
1510
1511/**
1512 * Retrieve namespace of a snapshot.
1513 *
1514 * Input:
1515 * @param snap_id id of the snapshot (uint64_t)
1516 *
1517 * Output:
1518 * @param SnapshotNamespace
1519 * @returns 0 on success, negative error code on failure.
1520 */
1521int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1522{
1523 uint64_t snap_id;
1524
1525 bufferlist::iterator iter = in->begin();
1526 try {
1527 ::decode(snap_id, iter);
1528 } catch (const buffer::error &err) {
1529 return -EINVAL;
1530 }
1531
1532 CLS_LOG(20, "get_snapshot_namespace snap_id=%" PRIu64, snap_id);
1533
1534 if (snap_id == CEPH_NOSNAP) {
1535 return -EINVAL;
1536 }
1537
1538 cls_rbd_snap snap;
1539 string snapshot_key;
1540 key_from_snap_id(snap_id, &snapshot_key);
1541 int r = read_key(hctx, snapshot_key, &snap);
1542 if (r < 0) {
1543 return r;
1544 }
1545
1546 ::encode(snap.snapshot_namespace, *out);
1547
1548 return 0;
1549}
1550
1551/**
1552 * Adds a snapshot to an rbd header. Ensures the id and name are unique.
1553 *
1554 * Input:
1555 * @param snap_name name of the snapshot (string)
1556 * @param snap_id id of the snapshot (uint64_t)
1557 * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespaceOnDisk)
1558 *
1559 * Output:
1560 * @returns 0 on success, negative error code on failure.
1561 * @returns -ESTALE if the input snap_id is less than the image's snap_seq
1562 * @returns -EEXIST if the id or name are already used by another snapshot
1563 */
1564int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1565{
1566 bufferlist snap_namebl, snap_idbl;
1567 cls_rbd_snap snap_meta;
1568 uint64_t snap_limit;
1569
1570 try {
1571 bufferlist::iterator iter = in->begin();
1572 ::decode(snap_meta.name, iter);
1573 ::decode(snap_meta.id, iter);
1574 if (!iter.end()) {
1575 ::decode(snap_meta.snapshot_namespace, iter);
1576 }
1577 } catch (const buffer::error &err) {
1578 return -EINVAL;
1579 }
1580
1581 if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
1582 &snap_meta.snapshot_namespace.snapshot_namespace) != nullptr) {
1583 CLS_ERR("Unknown snapshot namespace provided");
1584 return -EINVAL;
1585 }
1586
1587 CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_meta.name.c_str(),
1588 (unsigned long long)snap_meta.id.val);
1589
1590 if (snap_meta.id > CEPH_MAXSNAP)
1591 return -EINVAL;
1592
1593 uint64_t cur_snap_seq;
1594 int r = read_key(hctx, "snap_seq", &cur_snap_seq);
1595 if (r < 0) {
1596 CLS_ERR("Could not read image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1597 return r;
1598 }
1599
1600 // client lost a race with another snapshot creation.
1601 // snap_seq must be monotonically increasing.
1602 if (snap_meta.id < cur_snap_seq)
1603 return -ESTALE;
1604
1605 r = read_key(hctx, "size", &snap_meta.image_size);
1606 if (r < 0) {
1607 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
1608 return r;
1609 }
1610 r = read_key(hctx, "features", &snap_meta.features);
1611 if (r < 0) {
1612 CLS_ERR("Could not read image's features off disk: %s", cpp_strerror(r).c_str());
1613 return r;
1614 }
1615 r = read_key(hctx, "flags", &snap_meta.flags);
1616 if (r < 0 && r != -ENOENT) {
1617 CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
1618 return r;
1619 }
1620
1621 r = read_key(hctx, "snap_limit", &snap_limit);
1622 if (r == -ENOENT) {
1623 snap_limit = UINT64_MAX;
1624 } else if (r < 0) {
1625 CLS_ERR("Could not read snapshot limit off disk: %s", cpp_strerror(r).c_str());
1626 return r;
1627 }
1628
1629 snap_meta.timestamp = ceph_clock_now();
1630
1631 int max_read = RBD_MAX_KEYS_READ;
1632 uint64_t total_read = 0;
1633 string last_read = RBD_SNAP_KEY_PREFIX;
1634 do {
1635 map<string, bufferlist> vals;
1636 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1637 max_read, &vals);
1638 if (r < 0)
1639 return r;
1640
1641 total_read += vals.size();
1642 if (total_read >= snap_limit) {
1643 CLS_ERR("Attempt to create snapshot over limit of %" PRIu64, snap_limit);
1644 return -EDQUOT;
1645 }
1646
1647 for (map<string, bufferlist>::iterator it = vals.begin();
1648 it != vals.end(); ++it) {
1649 cls_rbd_snap old_meta;
1650 bufferlist::iterator iter = it->second.begin();
1651 try {
1652 ::decode(old_meta, iter);
1653 } catch (const buffer::error &err) {
1654 snapid_t snap_id = snap_id_from_key(it->first);
1655 CLS_ERR("error decoding snapshot metadata for snap_id: %llu",
1656 (unsigned long long)snap_id.val);
1657 return -EIO;
1658 }
1659 if ((snap_meta.name == old_meta.name &&
1660 snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
1661 snap_meta.id == old_meta.id) {
1662 CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
1663 snap_meta.name.c_str(), (unsigned long long)snap_meta.id.val,
1664 old_meta.name.c_str(), (unsigned long long)old_meta.id.val);
1665 return -EEXIST;
1666 }
1667 }
1668
1669 if (!vals.empty())
1670 last_read = vals.rbegin()->first;
1671 } while (r == RBD_MAX_KEYS_READ);
1672
1673 // snapshot inherits parent, if any
1674 cls_rbd_parent parent;
1675 r = read_key(hctx, "parent", &parent);
1676 if (r < 0 && r != -ENOENT)
1677 return r;
1678 if (r == 0) {
1679 snap_meta.parent = parent;
1680 }
1681
1682 bufferlist snap_metabl, snap_seqbl;
1683 ::encode(snap_meta, snap_metabl);
1684 ::encode(snap_meta.id, snap_seqbl);
1685
1686 string snapshot_key;
1687 key_from_snap_id(snap_meta.id, &snapshot_key);
1688 map<string, bufferlist> vals;
1689 vals["snap_seq"] = snap_seqbl;
1690 vals[snapshot_key] = snap_metabl;
1691 r = cls_cxx_map_set_vals(hctx, &vals);
1692 if (r < 0) {
1693 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1694 return r;
1695 }
1696
1697 return 0;
1698}
1699
1700
1701/**
1702 * rename snapshot .
1703 *
1704 * Input:
1705 * @param src_snap_id old snap id of the snapshot (snapid_t)
1706 * @param dst_snap_name new name of the snapshot (string)
1707 *
1708 * Output:
1709 * @returns 0 on success, negative error code on failure.
1710 */
1711int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1712{
1713 bufferlist snap_namebl, snap_idbl;
1714 snapid_t src_snap_id;
1715 string src_snap_key,dst_snap_name;
1716 cls_rbd_snap snap_meta;
1717 int r;
1718
1719 try {
1720 bufferlist::iterator iter = in->begin();
1721 ::decode(src_snap_id, iter);
1722 ::decode(dst_snap_name, iter);
1723 } catch (const buffer::error &err) {
1724 return -EINVAL;
1725 }
1726
1727 CLS_LOG(20, "snapshot_rename id=%llu dst_name=%s", (unsigned long long)src_snap_id.val,
1728 dst_snap_name.c_str());
1729
1730 int max_read = RBD_MAX_KEYS_READ;
1731 string last_read = RBD_SNAP_KEY_PREFIX;
1732 do {
1733 map<string, bufferlist> vals;
1734 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1735 max_read, &vals);
1736 if (r < 0)
1737 return r;
1738
1739 for (map<string, bufferlist>::iterator it = vals.begin();
1740 it != vals.end(); ++it) {
1741 bufferlist::iterator iter = it->second.begin();
1742 try {
1743 ::decode(snap_meta, iter);
1744 } catch (const buffer::error &err) {
1745 CLS_ERR("error decoding snapshot metadata for snap : %s",
1746 dst_snap_name.c_str());
1747 return -EIO;
1748 }
1749 if (dst_snap_name == snap_meta.name) {
1750 CLS_LOG(20, "snap_name %s matches existing snap with snap id = %llu ",
1751 dst_snap_name.c_str(), (unsigned long long)snap_meta.id.val);
1752 return -EEXIST;
1753 }
1754 }
1755 if (!vals.empty())
1756 last_read = vals.rbegin()->first;
1757 } while (r == RBD_MAX_KEYS_READ);
1758
1759 key_from_snap_id(src_snap_id, &src_snap_key);
1760 r = read_key(hctx, src_snap_key, &snap_meta);
1761 if (r == -ENOENT) {
1762 CLS_LOG(20, "cannot find existing snap with snap id = %llu ", (unsigned long long)src_snap_id);
1763 return r;
1764 }
1765 snap_meta.name = dst_snap_name;
1766 bufferlist snap_metabl;
1767 ::encode(snap_meta, snap_metabl);
1768
1769 r = cls_cxx_map_set_val(hctx, src_snap_key, &snap_metabl);
1770 if (r < 0) {
1771 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1772 return r;
1773 }
1774
1775 return 0;
1776}
1777/**
1778 * Removes a snapshot from an rbd header.
1779 *
1780 * Input:
1781 * @param snap_id the id of the snapshot to remove (uint64_t)
1782 *
1783 * Output:
1784 * @returns 0 on success, negative error code on failure
1785 */
1786int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1787{
1788 snapid_t snap_id;
1789
1790 try {
1791 bufferlist::iterator iter = in->begin();
1792 ::decode(snap_id, iter);
1793 } catch (const buffer::error &err) {
1794 return -EINVAL;
1795 }
1796
1797 CLS_LOG(20, "snapshot_remove id=%llu", (unsigned long long)snap_id.val);
1798
1799 // check if the key exists. we can't rely on remove_key doing this for
1800 // us, since OMAPRMKEYS returns success if the key is not there.
1801 // bug or feature? sounds like a bug, since tmap did not have this
1802 // behavior, but cls_rgw may rely on it...
1803 cls_rbd_snap snap;
1804 string snapshot_key;
1805 key_from_snap_id(snap_id, &snapshot_key);
1806 int r = read_key(hctx, snapshot_key, &snap);
1807 if (r == -ENOENT)
1808 return -ENOENT;
1809
1810 if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED)
1811 return -EBUSY;
1812
1813 r = cls_cxx_map_remove_key(hctx, snapshot_key);
1814 if (r < 0) {
1815 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1816 return r;
1817 }
1818
1819 return 0;
1820}
1821
1822/**
1823 * Returns a uint64_t of all the features supported by this class.
1824 */
1825int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1826{
1827 uint64_t all_features = RBD_FEATURES_ALL;
1828 ::encode(all_features, *out);
1829 return 0;
1830}
1831
1832/**
1833 * "Copy up" data from the parent of a clone to the clone's object(s).
1834 * Used for implementing copy-on-write for a clone image. Client
1835 * will pass down a chunk of data that fits completely within one
1836 * clone block (one object), and is aligned (starts at beginning of block),
1837 * but may be shorter (for non-full parent blocks). The class method
1838 * can't know the object size to validate the requested length,
1839 * so it just writes the data as given if the child object doesn't
1840 * already exist, and returns success if it does.
1841 *
1842 * Input:
1843 * @param in bufferlist of data to write
1844 *
1845 * Output:
1846 * @returns 0 on success, or if block already exists in child
1847 * negative error code on other error
1848 */
1849
1850int copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1851{
1852 // check for existence; if child object exists, just return success
1853 if (cls_cxx_stat(hctx, NULL, NULL) == 0)
1854 return 0;
1855 CLS_LOG(20, "copyup: writing length %d\n", in->length());
1856 return cls_cxx_write(hctx, 0, in->length(), in);
1857}
1858
1859
1860/************************ rbd_id object methods **************************/
1861
1862/**
1863 * Input:
1864 * @param in ignored
1865 *
1866 * Output:
1867 * @param id the id stored in the object
1868 * @returns 0 on success, negative error code on failure
1869 */
1870int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1871{
1872 uint64_t size;
1873 int r = cls_cxx_stat(hctx, &size, NULL);
1874 if (r < 0)
1875 return r;
1876
1877 if (size == 0)
1878 return -ENOENT;
1879
1880 bufferlist read_bl;
1881 r = cls_cxx_read(hctx, 0, size, &read_bl);
1882 if (r < 0) {
1883 CLS_ERR("get_id: could not read id: %s", cpp_strerror(r).c_str());
1884 return r;
1885 }
1886
1887 string id;
1888 try {
1889 bufferlist::iterator iter = read_bl.begin();
1890 ::decode(id, iter);
1891 } catch (const buffer::error &err) {
1892 return -EIO;
1893 }
1894
1895 ::encode(id, *out);
1896 return 0;
1897}
1898
1899/**
1900 * Set the id of an image. The object must already exist.
1901 *
1902 * Input:
1903 * @param id the id of the image, as an alpha-numeric string
1904 *
1905 * Output:
1906 * @returns 0 on success, -EEXIST if the atomic create fails,
1907 * negative error code on other error
1908 */
1909int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1910{
1911 int r = check_exists(hctx);
1912 if (r < 0)
1913 return r;
1914
1915 string id;
1916 try {
1917 bufferlist::iterator iter = in->begin();
1918 ::decode(id, iter);
1919 } catch (const buffer::error &err) {
1920 return -EINVAL;
1921 }
1922
1923 if (!is_valid_id(id)) {
1924 CLS_ERR("set_id: invalid id '%s'", id.c_str());
1925 return -EINVAL;
1926 }
1927
1928 uint64_t size;
1929 r = cls_cxx_stat(hctx, &size, NULL);
1930 if (r < 0)
1931 return r;
1932 if (size != 0)
1933 return -EEXIST;
1934
1935 CLS_LOG(20, "set_id: id=%s", id.c_str());
1936
1937 bufferlist write_bl;
1938 ::encode(id, write_bl);
1939 return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
1940}
1941
1942/*********************** methods for rbd_directory ***********************/
1943
1944static const string dir_key_for_id(const string &id)
1945{
1946 return RBD_DIR_ID_KEY_PREFIX + id;
1947}
1948
1949static const string dir_key_for_name(const string &name)
1950{
1951 return RBD_DIR_NAME_KEY_PREFIX + name;
1952}
1953
1954static const string dir_name_from_key(const string &key)
1955{
1956 return key.substr(strlen(RBD_DIR_NAME_KEY_PREFIX));
1957}
1958
1959static int dir_add_image_helper(cls_method_context_t hctx,
1960 const string &name, const string &id,
1961 bool check_for_unique_id)
1962{
1963 if (!name.size() || !is_valid_id(id)) {
1964 CLS_ERR("dir_add_image_helper: invalid name '%s' or id '%s'",
1965 name.c_str(), id.c_str());
1966 return -EINVAL;
1967 }
1968
1969 CLS_LOG(20, "dir_add_image_helper name=%s id=%s", name.c_str(), id.c_str());
1970
1971 string tmp;
1972 string name_key = dir_key_for_name(name);
1973 string id_key = dir_key_for_id(id);
1974 int r = read_key(hctx, name_key, &tmp);
1975 if (r != -ENOENT) {
1976 CLS_LOG(10, "name already exists");
1977 return -EEXIST;
1978 }
1979 r = read_key(hctx, id_key, &tmp);
1980 if (r != -ENOENT && check_for_unique_id) {
1981 CLS_LOG(10, "id already exists");
1982 return -EBADF;
1983 }
1984 bufferlist id_bl, name_bl;
1985 ::encode(id, id_bl);
1986 ::encode(name, name_bl);
1987 map<string, bufferlist> omap_vals;
1988 omap_vals[name_key] = id_bl;
1989 omap_vals[id_key] = name_bl;
1990 return cls_cxx_map_set_vals(hctx, &omap_vals);
1991}
1992
1993static int dir_remove_image_helper(cls_method_context_t hctx,
1994 const string &name, const string &id)
1995{
1996 CLS_LOG(20, "dir_remove_image_helper name=%s id=%s",
1997 name.c_str(), id.c_str());
1998
1999 string stored_name, stored_id;
2000 string name_key = dir_key_for_name(name);
2001 string id_key = dir_key_for_id(id);
2002 int r = read_key(hctx, name_key, &stored_id);
2003 if (r < 0) {
2004 if (r != -ENOENT)
2005 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
2006 return r;
2007 }
2008 r = read_key(hctx, id_key, &stored_name);
2009 if (r < 0) {
2010 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
2011 return r;
2012 }
2013
2014 // check if this op raced with a rename
2015 if (stored_name != name || stored_id != id) {
2016 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
2017 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
2018 return -ESTALE;
2019 }
2020
2021 r = cls_cxx_map_remove_key(hctx, name_key);
2022 if (r < 0) {
2023 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
2024 return r;
2025 }
2026
2027 r = cls_cxx_map_remove_key(hctx, id_key);
2028 if (r < 0) {
2029 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
2030 return r;
2031 }
2032
2033 return 0;
2034}
2035
2036/**
2037 * Rename an image in the directory, updating both indexes
2038 * atomically. This can't be done from the client calling
2039 * dir_add_image and dir_remove_image in one transaction because the
2040 * results of the first method are not visibale to later steps.
2041 *
2042 * Input:
2043 * @param src original name of the image
2044 * @param dest new name of the image
2045 * @param id the id of the image
2046 *
2047 * Output:
2048 * @returns -ESTALE if src and id do not map to each other
2049 * @returns -ENOENT if src or id are not in the directory
2050 * @returns -EEXIST if dest already exists
2051 * @returns 0 on success, negative error code on failure
2052 */
2053int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2054{
2055 string src, dest, id;
2056 try {
2057 bufferlist::iterator iter = in->begin();
2058 ::decode(src, iter);
2059 ::decode(dest, iter);
2060 ::decode(id, iter);
2061 } catch (const buffer::error &err) {
2062 return -EINVAL;
2063 }
2064
2065 int r = dir_remove_image_helper(hctx, src, id);
2066 if (r < 0)
2067 return r;
2068 // ignore duplicate id because the result of
2069 // remove_image_helper is not visible yet
2070 return dir_add_image_helper(hctx, dest, id, false);
2071}
2072
2073/**
2074 * Get the id of an image given its name.
2075 *
2076 * Input:
2077 * @param name the name of the image
2078 *
2079 * Output:
2080 * @param id the id of the image
2081 * @returns 0 on success, negative error code on failure
2082 */
2083int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2084{
2085 string name;
2086
2087 try {
2088 bufferlist::iterator iter = in->begin();
2089 ::decode(name, iter);
2090 } catch (const buffer::error &err) {
2091 return -EINVAL;
2092 }
2093
2094 CLS_LOG(20, "dir_get_id: name=%s", name.c_str());
2095
2096 string id;
2097 int r = read_key(hctx, dir_key_for_name(name), &id);
2098 if (r < 0) {
2099 if (r != -ENOENT)
2100 CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
2101 return r;
2102 }
2103 ::encode(id, *out);
2104 return 0;
2105}
2106
2107/**
2108 * Get the name of an image given its id.
2109 *
2110 * Input:
2111 * @param id the id of the image
2112 *
2113 * Output:
2114 * @param name the name of the image
2115 * @returns 0 on success, negative error code on failure
2116 */
2117int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2118{
2119 string id;
2120
2121 try {
2122 bufferlist::iterator iter = in->begin();
2123 ::decode(id, iter);
2124 } catch (const buffer::error &err) {
2125 return -EINVAL;
2126 }
2127
2128 CLS_LOG(20, "dir_get_name: id=%s", id.c_str());
2129
2130 string name;
2131 int r = read_key(hctx, dir_key_for_id(id), &name);
2132 if (r < 0) {
2133 CLS_ERR("error reading name for id '%s': %s", id.c_str(), cpp_strerror(r).c_str());
2134 return r;
2135 }
2136 ::encode(name, *out);
2137 return 0;
2138}
2139
2140/**
2141 * List the names and ids of the images in the directory, sorted by
2142 * name.
2143 *
2144 * Input:
2145 * @param start_after which name to begin listing after
2146 * (use the empty string to start at the beginning)
2147 * @param max_return the maximum number of names to list
2148 *
2149 * Output:
2150 * @param images map from name to id of up to max_return images
2151 * @returns 0 on success, negative error code on failure
2152 */
2153int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2154{
2155 string start_after;
2156 uint64_t max_return;
2157
2158 try {
2159 bufferlist::iterator iter = in->begin();
2160 ::decode(start_after, iter);
2161 ::decode(max_return, iter);
2162 } catch (const buffer::error &err) {
2163 return -EINVAL;
2164 }
2165
2166 int max_read = RBD_MAX_KEYS_READ;
2167 int r = max_read;
2168 map<string, string> images;
2169 string last_read = dir_key_for_name(start_after);
2170
2171 while (r == max_read && images.size() < max_return) {
2172 map<string, bufferlist> vals;
2173 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
2174 r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
2175 max_read, &vals);
2176 if (r < 0) {
2177 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
2178 return r;
2179 }
2180
2181 for (map<string, bufferlist>::iterator it = vals.begin();
2182 it != vals.end(); ++it) {
2183 string id;
2184 bufferlist::iterator iter = it->second.begin();
2185 try {
2186 ::decode(id, iter);
2187 } catch (const buffer::error &err) {
2188 CLS_ERR("could not decode id of image '%s'", it->first.c_str());
2189 return -EIO;
2190 }
2191 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(it->first).c_str(), id.c_str());
2192 images[dir_name_from_key(it->first)] = id;
2193 if (images.size() >= max_return)
2194 break;
2195 }
2196 if (!vals.empty()) {
2197 last_read = dir_key_for_name(images.rbegin()->first);
2198 }
2199 }
2200
2201 ::encode(images, *out);
2202
2203 return 0;
2204}
2205
2206/**
2207 * Add an image to the rbd directory. Creates the directory object if
2208 * needed, and updates the index from id to name and name to id.
2209 *
2210 * Input:
2211 * @param name the name of the image
2212 * @param id the id of the image
2213 *
2214 * Output:
2215 * @returns -EEXIST if the image name is already in the directory
2216 * @returns -EBADF if the image id is already in the directory
2217 * @returns 0 on success, negative error code on failure
2218 */
2219int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2220{
2221 int r = cls_cxx_create(hctx, false);
2222 if (r < 0) {
2223 CLS_ERR("could not create directory: %s", cpp_strerror(r).c_str());
2224 return r;
2225 }
2226
2227 string name, id;
2228 try {
2229 bufferlist::iterator iter = in->begin();
2230 ::decode(name, iter);
2231 ::decode(id, iter);
2232 } catch (const buffer::error &err) {
2233 return -EINVAL;
2234 }
2235
2236 return dir_add_image_helper(hctx, name, id, true);
2237}
2238
2239/**
2240 * Remove an image from the rbd directory.
2241 *
2242 * Input:
2243 * @param name the name of the image
2244 * @param id the id of the image
2245 *
2246 * Output:
2247 * @returns -ESTALE if the name and id do not map to each other
2248 * @returns 0 on success, negative error code on failure
2249 */
2250int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2251{
2252 string name, id;
2253 try {
2254 bufferlist::iterator iter = in->begin();
2255 ::decode(name, iter);
2256 ::decode(id, iter);
2257 } catch (const buffer::error &err) {
2258 return -EINVAL;
2259 }
2260
2261 return dir_remove_image_helper(hctx, name, id);
2262}
2263
2264int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
2265{
2266 uint64_t size;
2267 int r = cls_cxx_stat(hctx, &size, NULL);
2268 if (r < 0) {
2269 return r;
2270 }
2271 if (size == 0) {
2272 return -ENOENT;
2273 }
2274
2275 bufferlist bl;
2276 r = cls_cxx_read(hctx, 0, size, &bl);
2277 if (r < 0) {
2278 return r;
2279 }
2280
2281 try {
2282 bufferlist::iterator iter = bl.begin();
2283 ::decode(object_map, iter);
2284 } catch (const buffer::error &err) {
2285 CLS_ERR("failed to decode object map: %s", err.what());
2286 return -EINVAL;
2287 }
2288 return 0;
2289}
2290
2291/**
2292 * Load an rbd image's object map
2293 *
2294 * Input:
2295 * none
2296 *
2297 * Output:
2298 * @param object map bit vector
2299 * @returns 0 on success, negative error code on failure
2300 */
2301int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2302{
2303 BitVector<2> object_map;
2304 int r = object_map_read(hctx, object_map);
2305 if (r < 0) {
2306 return r;
2307 }
2308
2309 object_map.set_crc_enabled(false);
2310 ::encode(object_map, *out);
2311 return 0;
2312}
2313
2314/**
2315 * Save an rbd image's object map
2316 *
2317 * Input:
2318 * @param object map bit vector
2319 *
2320 * Output:
2321 * @returns 0 on success, negative error code on failure
2322 */
2323int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2324{
2325 BitVector<2> object_map;
2326 try {
2327 bufferlist::iterator iter = in->begin();
2328 ::decode(object_map, iter);
2329 } catch (const buffer::error &err) {
2330 return -EINVAL;
2331 }
2332
2333 object_map.set_crc_enabled(true);
2334
2335 bufferlist bl;
2336 ::encode(object_map, bl);
2337 CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
2338 object_map.size(), bl.length());
2339 return cls_cxx_write_full(hctx, &bl);
2340}
2341
2342/**
2343 * Resize an rbd image's object map
2344 *
2345 * Input:
2346 * @param object_count the max number of objects in the image
2347 * @param default_state the default state of newly created objects
2348 *
2349 * Output:
2350 * @returns 0 on success, negative error code on failure
2351 */
2352int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2353{
2354 uint64_t object_count;
2355 uint8_t default_state;
2356 try {
2357 bufferlist::iterator iter = in->begin();
2358 ::decode(object_count, iter);
2359 ::decode(default_state, iter);
2360 } catch (const buffer::error &err) {
2361 return -EINVAL;
2362 }
2363
2364 // protect against excessive memory requirements
2365 if (object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
2366 CLS_ERR("object map too large: %" PRIu64, object_count);
2367 return -EINVAL;
2368 }
2369
2370 BitVector<2> object_map;
2371 int r = object_map_read(hctx, object_map);
2372 if ((r < 0) && (r != -ENOENT)) {
2373 return r;
2374 }
2375
2376 size_t orig_object_map_size = object_map.size();
2377 if (object_count < orig_object_map_size) {
2378 for (uint64_t i = object_count + 1; i < orig_object_map_size; ++i) {
2379 if (object_map[i] != default_state) {
2380 CLS_ERR("object map indicates object still exists: %" PRIu64, i);
2381 return -ESTALE;
2382 }
2383 }
2384 object_map.resize(object_count);
2385 } else if (object_count > orig_object_map_size) {
2386 object_map.resize(object_count);
2387 for (uint64_t i = orig_object_map_size; i < object_count; ++i) {
2388 object_map[i] = default_state;
2389 }
2390 }
2391
2392 bufferlist map;
2393 ::encode(object_map, map);
2394 CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
2395 object_count, map.length());
2396 return cls_cxx_write_full(hctx, &map);
2397}
2398
2399/**
2400 * Update an rbd image's object map
2401 *
2402 * Input:
2403 * @param start_object_no the start object iterator
2404 * @param end_object_no the end object iterator
2405 * @param new_object_state the new object state
2406 * @param current_object_state optional current object state filter
2407 *
2408 * Output:
2409 * @returns 0 on success, negative error code on failure
2410 */
2411int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2412{
2413 uint64_t start_object_no;
2414 uint64_t end_object_no;
2415 uint8_t new_object_state;
2416 boost::optional<uint8_t> current_object_state;
2417 try {
2418 bufferlist::iterator iter = in->begin();
2419 ::decode(start_object_no, iter);
2420 ::decode(end_object_no, iter);
2421 ::decode(new_object_state, iter);
2422 ::decode(current_object_state, iter);
2423 } catch (const buffer::error &err) {
2424 CLS_ERR("failed to decode message");
2425 return -EINVAL;
2426 }
2427
2428 uint64_t size;
2429 int r = cls_cxx_stat(hctx, &size, NULL);
2430 if (r < 0) {
2431 return r;
2432 }
2433
2434 BitVector<2> object_map;
2435 bufferlist header_bl;
2436 r = cls_cxx_read2(hctx, 0, object_map.get_header_length(), &header_bl,
2437 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2438 if (r < 0) {
2439 CLS_ERR("object map header read failed");
2440 return r;
2441 }
2442
2443 try {
2444 bufferlist::iterator it = header_bl.begin();
2445 object_map.decode_header(it);
2446 } catch (const buffer::error &err) {
2447 CLS_ERR("failed to decode object map header: %s", err.what());
2448 return -EINVAL;
2449 }
2450
2451 bufferlist footer_bl;
2452 r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
2453 size - object_map.get_footer_offset(), &footer_bl,
2454 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2455 if (r < 0) {
2456 CLS_ERR("object map footer read failed");
2457 return r;
2458 }
2459
2460 try {
2461 bufferlist::iterator it = footer_bl.begin();
2462 object_map.decode_footer(it);
2463 } catch (const buffer::error &err) {
2464 CLS_ERR("failed to decode object map footer: %s", err.what());
2465 }
2466
2467 if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
2468 return -ERANGE;
2469 }
2470
2471 uint64_t byte_offset;
2472 uint64_t byte_length;
2473 object_map.get_data_extents(start_object_no,
2474 end_object_no - start_object_no,
2475 &byte_offset, &byte_length);
2476
2477 bufferlist data_bl;
2478 r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
2479 byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2480 if (r < 0) {
2481 CLS_ERR("object map data read failed");
2482 return r;
2483 }
2484
2485 try {
2486 bufferlist::iterator it = data_bl.begin();
2487 object_map.decode_data(it, byte_offset);
2488 } catch (const buffer::error &err) {
2489 CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
2490 byte_offset, err.what());
2491 return -EINVAL;
2492 }
2493
2494 bool updated = false;
2495 for (uint64_t object_no = start_object_no; object_no < end_object_no;
2496 ++object_no) {
2497 uint8_t state = object_map[object_no];
2498 if ((!current_object_state || state == *current_object_state ||
2499 (*current_object_state == OBJECT_EXISTS &&
2500 state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
2501 object_map[object_no] = new_object_state;
2502 updated = true;
2503 }
2504 }
2505
2506 if (updated) {
2507 CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
2508 byte_offset, byte_length,
2509 object_map.get_header_length() + byte_offset);
2510
2511 bufferlist data_bl;
2512 object_map.encode_data(data_bl, byte_offset, byte_length);
2513 r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
2514 data_bl.length(), &data_bl,
2515 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2516 if (r < 0) {
2517 CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
2518 return r;
2519 }
2520
2521 footer_bl.clear();
2522 object_map.encode_footer(footer_bl);
2523 r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
2524 &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2525 if (r < 0) {
2526 CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
2527 return r;
2528 }
2529 } else {
2530 CLS_LOG(20, "object_map_update: no update necessary");
2531 }
2532
2533 return 0;
2534}
2535
2536/**
2537 * Mark all _EXISTS objects as _EXISTS_CLEAN so future writes to the
2538 * image HEAD can be tracked.
2539 *
2540 * Input:
2541 * none
2542 *
2543 * Output:
2544 * @returns 0 on success, negative error code on failure
2545 */
2546int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
2547 bufferlist *out)
2548{
2549 BitVector<2> object_map;
2550 int r = object_map_read(hctx, object_map);
2551 if (r < 0) {
2552 return r;
2553 }
2554
2555 bool updated = false;
2556 for (uint64_t i = 0; i < object_map.size(); ++i) {
2557 if (object_map[i] == OBJECT_EXISTS) {
2558 object_map[i] = OBJECT_EXISTS_CLEAN;
2559 updated = true;
2560 }
2561 }
2562
2563 if (updated) {
2564 bufferlist bl;
2565 ::encode(object_map, bl);
2566 r = cls_cxx_write_full(hctx, &bl);
2567 }
2568 return r;
2569}
2570
2571/**
2572 * Mark all _EXISTS_CLEAN objects as _EXISTS in the current object map
2573 * if the provided snapshot object map object is marked as _EXISTS.
2574 *
2575 * Input:
2576 * @param snapshot object map bit vector
2577 *
2578 * Output:
2579 * @returns 0 on success, negative error code on failure
2580 */
2581int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
2582 bufferlist *out)
2583{
2584 BitVector<2> src_object_map;
2585 try {
2586 bufferlist::iterator iter = in->begin();
2587 ::decode(src_object_map, iter);
2588 } catch (const buffer::error &err) {
2589 return -EINVAL;
2590 }
2591
2592 BitVector<2> dst_object_map;
2593 int r = object_map_read(hctx, dst_object_map);
2594 if (r < 0) {
2595 return r;
2596 }
2597
2598 bool updated = false;
2599 for (uint64_t i = 0; i < dst_object_map.size(); ++i) {
2600 if (dst_object_map[i] == OBJECT_EXISTS_CLEAN &&
2601 (i >= src_object_map.size() || src_object_map[i] == OBJECT_EXISTS)) {
2602 dst_object_map[i] = OBJECT_EXISTS;
2603 updated = true;
2604 }
2605 }
2606
2607 if (updated) {
2608 bufferlist bl;
2609 ::encode(dst_object_map, bl);
2610 r = cls_cxx_write_full(hctx, &bl);
2611 }
2612 return r;
2613}
2614
2615static const string metadata_key_for_name(const string &name)
2616{
2617 return RBD_METADATA_KEY_PREFIX + name;
2618}
2619
2620static const string metadata_name_from_key(const string &key)
2621{
2622 return key.substr(strlen(RBD_METADATA_KEY_PREFIX));
2623}
2624
2625/**
2626 * Input:
2627 * @param start_after which name to begin listing after
2628 * (use the empty string to start at the beginning)
2629 * @param max_return the maximum number of names to list(if 0 means no limit)
2630
2631 * Output:
2632 * @param value
2633 * @returns 0 on success, negative error code on failure
2634 */
2635int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2636{
2637 string start_after;
2638 uint64_t max_return;
2639
2640 try {
2641 bufferlist::iterator iter = in->begin();
2642 ::decode(start_after, iter);
2643 ::decode(max_return, iter);
2644 } catch (const buffer::error &err) {
2645 return -EINVAL;
2646 }
2647
2648 map<string, bufferlist> data;
2649 string last_read = metadata_key_for_name(start_after);
2650 int max_read = max_return ? MIN(RBD_MAX_KEYS_READ, max_return) : RBD_MAX_KEYS_READ;
2651
2652 do {
2653 map<string, bufferlist> raw_data;
2654 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
2655 max_read, &raw_data);
2656 if (r < 0) {
2657 CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
2658 return r;
2659 }
2660 if (raw_data.empty())
2661 break;
2662
2663 map<string, bufferlist>::iterator it = raw_data.begin();
2664 for (; it != raw_data.end(); ++it)
2665 data[metadata_name_from_key(it->first)].swap(it->second);
2666
2667 if (r < max_read)
2668 break;
2669
2670 last_read = raw_data.rbegin()->first;
2671 if (max_return)
2672 max_read = MIN(RBD_MAX_KEYS_READ, max_return - data.size());
2673 } while (max_read);
2674
2675 ::encode(data, *out);
2676 return 0;
2677}
2678
2679/**
2680 * Input:
2681 * @param data <map(key, value)>
2682 *
2683 * Output:
2684 * @returns 0 on success, negative error code on failure
2685 */
2686int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2687{
2688 map<string, bufferlist> data, raw_data;
2689
2690 bufferlist::iterator iter = in->begin();
2691 try {
2692 ::decode(data, iter);
2693 } catch (const buffer::error &err) {
2694 return -EINVAL;
2695 }
2696
2697 for (map<string, bufferlist>::iterator it = data.begin();
2698 it != data.end(); ++it) {
2699 CLS_LOG(20, "metdata_set key=%s value=%.*s", it->first.c_str(),
2700 it->second.length(), it->second.c_str());
2701 raw_data[metadata_key_for_name(it->first)].swap(it->second);
2702 }
2703 int r = cls_cxx_map_set_vals(hctx, &raw_data);
2704 if (r < 0) {
2705 CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
2706 return r;
2707 }
2708
2709 return 0;
2710}
2711
2712/**
2713 * Input:
2714 * @param key
2715 *
2716 * Output:
2717 * @returns 0 on success, negative error code on failure
2718 */
2719int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2720{
2721 string key;
2722
2723 bufferlist::iterator iter = in->begin();
2724 try {
2725 ::decode(key, iter);
2726 } catch (const buffer::error &err) {
2727 return -EINVAL;
2728 }
2729
2730 CLS_LOG(20, "metdata_set key=%s", key.c_str());
2731
2732 int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
2733 if (r < 0) {
2734 CLS_ERR("error remove metadata: %s", cpp_strerror(r).c_str());
2735 return r;
2736 }
2737
2738 return 0;
2739}
2740
2741/**
2742 * Input:
2743 * @param key
2744 *
2745 * Output:
2746 * @param metadata value associated with the key
2747 * @returns 0 on success, negative error code on failure
2748 */
2749int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2750{
2751 string key;
2752 bufferlist value;
2753
2754 bufferlist::iterator iter = in->begin();
2755 try {
2756 ::decode(key, iter);
2757 } catch (const buffer::error &err) {
2758 return -EINVAL;
2759 }
2760
2761 CLS_LOG(20, "metdata_get key=%s", key.c_str());
2762
2763 int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
2764 if (r < 0) {
2765 CLS_ERR("error get metadata: %s", cpp_strerror(r).c_str());
2766 return r;
2767 }
2768
2769 ::encode(value, *out);
2770 return 0;
2771}
2772
2773int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
2774 bufferlist *out)
2775{
2776 uint64_t snap_limit;
2777 int r = read_key(hctx, "snap_limit", &snap_limit);
2778 if (r == -ENOENT) {
2779 snap_limit = UINT64_MAX;
2780 } else if (r < 0) {
2781 CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
2782 return r;
2783 }
2784
2785 CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
2786 ::encode(snap_limit, *out);
2787
2788 return 0;
2789}
2790
2791int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
2792 bufferlist *out)
2793{
2794 int rc;
2795 uint64_t new_limit;
2796 bufferlist bl;
2797
2798 try {
2799 bufferlist::iterator iter = in->begin();
2800 ::decode(new_limit, iter);
2801 } catch (const buffer::error &err) {
2802 return -EINVAL;
2803 }
2804
2805 if (new_limit == UINT64_MAX) {
2806 CLS_LOG(20, "remove snapshot limit\n");
2807 rc = cls_cxx_map_remove_key(hctx, "snap_limit");
2808 } else {
2809 CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
2810 ::encode(new_limit, bl);
2811 rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
2812 }
2813
2814 return rc;
2815}
2816
2817
2818/****************************** Old format *******************************/
2819
2820int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2821{
2822 bufferlist bl;
2823 struct rbd_obj_header_ondisk *header;
2824 int rc = snap_read_header(hctx, bl);
2825 if (rc < 0)
2826 return rc;
2827
2828 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2829 bufferptr p(header->snap_names_len);
2830 char *buf = (char *)header;
2831 char *name = buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk);
2832 char *end = name + header->snap_names_len;
2833 memcpy(p.c_str(),
2834 buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
2835 header->snap_names_len);
2836
2837 ::encode(header->snap_seq, *out);
2838 ::encode(header->snap_count, *out);
2839
2840 for (unsigned i = 0; i < header->snap_count; i++) {
2841 string s = name;
2842 ::encode(header->snaps[i].id, *out);
2843 ::encode(header->snaps[i].image_size, *out);
2844 ::encode(s, *out);
2845
2846 name += strlen(name) + 1;
2847 if (name > end)
2848 return -EIO;
2849 }
2850
2851 return 0;
2852}
2853
2854int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2855{
2856 bufferlist bl;
2857 struct rbd_obj_header_ondisk *header;
2858 bufferlist newbl;
2859 bufferptr header_bp(sizeof(*header));
2860 struct rbd_obj_snap_ondisk *new_snaps;
2861
2862 int rc = snap_read_header(hctx, bl);
2863 if (rc < 0)
2864 return rc;
2865
2866 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2867
2868 int snaps_id_ofs = sizeof(*header);
2869 int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
2870 const char *snap_name;
2871 const char *snap_names = ((char *)header) + names_ofs;
2872 const char *end = snap_names + header->snap_names_len;
2873 bufferlist::iterator iter = in->begin();
2874 string s;
2875 uint64_t snap_id;
2876
2877 try {
2878 ::decode(s, iter);
2879 ::decode(snap_id, iter);
2880 } catch (const buffer::error &err) {
2881 return -EINVAL;
2882 }
2883 snap_name = s.c_str();
2884
2885 if (header->snap_seq > snap_id)
2886 return -ESTALE;
2887
2888 uint64_t snap_limit;
2889 rc = read_key(hctx, "snap_limit", &snap_limit);
2890 if (rc == -ENOENT) {
2891 snap_limit = UINT64_MAX;
2892 } else if (rc < 0) {
2893 return rc;
2894 }
2895
2896 if (header->snap_count >= snap_limit)
2897 return -EDQUOT;
2898
2899 const char *cur_snap_name;
2900 for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
2901 if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
2902 return -EEXIST;
2903 }
2904 if (cur_snap_name > end)
2905 return -EIO;
2906
2907 int snap_name_len = strlen(snap_name);
2908
2909 bufferptr new_names_bp(header->snap_names_len + snap_name_len + 1);
2910 bufferptr new_snaps_bp(sizeof(*new_snaps) * (header->snap_count + 1));
2911
2912 /* copy snap names and append to new snap name */
2913 char *new_snap_names = new_names_bp.c_str();
2914 strcpy(new_snap_names, snap_name);
2915 memcpy(new_snap_names + snap_name_len + 1, snap_names, header->snap_names_len);
2916
2917 /* append new snap id */
2918 new_snaps = (struct rbd_obj_snap_ondisk *)new_snaps_bp.c_str();
2919 memcpy(new_snaps + 1, header->snaps, sizeof(*new_snaps) * header->snap_count);
2920
2921 header->snap_count = header->snap_count + 1;
2922 header->snap_names_len = header->snap_names_len + snap_name_len + 1;
2923 header->snap_seq = snap_id;
2924
2925 new_snaps[0].id = snap_id;
2926 new_snaps[0].image_size = header->image_size;
2927
2928 memcpy(header_bp.c_str(), header, sizeof(*header));
2929
2930 newbl.push_back(header_bp);
2931 newbl.push_back(new_snaps_bp);
2932 newbl.push_back(new_names_bp);
2933
2934 rc = cls_cxx_write_full(hctx, &newbl);
2935 if (rc < 0)
2936 return rc;
2937
2938 return 0;
2939}
2940
2941int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2942{
2943 bufferlist bl;
2944 struct rbd_obj_header_ondisk *header;
2945 bufferlist newbl;
2946 bufferptr header_bp(sizeof(*header));
2947
2948 int rc = snap_read_header(hctx, bl);
2949 if (rc < 0)
2950 return rc;
2951
2952 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2953
2954 int snaps_id_ofs = sizeof(*header);
2955 int names_ofs = snaps_id_ofs + sizeof(struct rbd_obj_snap_ondisk) * header->snap_count;
2956 const char *snap_name;
2957 const char *snap_names = ((char *)header) + names_ofs;
2958 const char *orig_names = snap_names;
2959 const char *end = snap_names + header->snap_names_len;
2960 bufferlist::iterator iter = in->begin();
2961 string s;
2962 unsigned i;
2963 bool found = false;
2964 struct rbd_obj_snap_ondisk snap;
2965
2966 try {
2967 ::decode(s, iter);
2968 } catch (const buffer::error &err) {
2969 return -EINVAL;
2970 }
2971 snap_name = s.c_str();
2972
2973 for (i = 0; snap_names < end; i++) {
2974 if (strcmp(snap_names, snap_name) == 0) {
2975 snap = header->snaps[i];
2976 found = true;
2977 break;
2978 }
2979 snap_names += strlen(snap_names) + 1;
2980 }
2981 if (!found) {
2982 CLS_ERR("couldn't find snap %s\n", snap_name);
2983 return -ENOENT;
2984 }
2985
2986 header->snap_names_len = header->snap_names_len - (s.length() + 1);
2987 header->snap_count = header->snap_count - 1;
2988
2989 bufferptr new_names_bp(header->snap_names_len);
2990 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
2991
2992 memcpy(header_bp.c_str(), header, sizeof(*header));
2993 newbl.push_back(header_bp);
2994
2995 if (header->snap_count) {
2996 int snaps_len = 0;
2997 int names_len = 0;
2998 CLS_LOG(20, "i=%u\n", i);
2999 if (i > 0) {
3000 snaps_len = sizeof(header->snaps[0]) * i;
3001 names_len = snap_names - orig_names;
3002 memcpy(new_snaps_bp.c_str(), header->snaps, snaps_len);
3003 memcpy(new_names_bp.c_str(), orig_names, names_len);
3004 }
3005 snap_names += s.length() + 1;
3006
3007 if (i < header->snap_count) {
3008 memcpy(new_snaps_bp.c_str() + snaps_len,
3009 header->snaps + i + 1,
3010 sizeof(header->snaps[0]) * (header->snap_count - i));
3011 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3012 }
3013 newbl.push_back(new_snaps_bp);
3014 newbl.push_back(new_names_bp);
3015 }
3016
3017 rc = cls_cxx_write_full(hctx, &newbl);
3018 if (rc < 0)
3019 return rc;
3020
3021 return 0;
3022}
3023
3024/**
3025 * rename snapshot of old format.
3026 *
3027 * Input:
3028 * @param src_snap_id old snap id of the snapshot (snapid_t)
3029 * @param dst_snap_name new name of the snapshot (string)
3030 *
3031 * Output:
3032 * @returns 0 on success, negative error code on failure.
3033*/
3034int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3035{
3036 bufferlist bl;
3037 struct rbd_obj_header_ondisk *header;
3038 bufferlist newbl;
3039 bufferptr header_bp(sizeof(*header));
3040 snapid_t src_snap_id;
3041 const char *dst_snap_name;
3042 string dst;
3043
3044 int rc = snap_read_header(hctx, bl);
3045 if (rc < 0)
3046 return rc;
3047
3048 header = (struct rbd_obj_header_ondisk *)bl.c_str();
3049
3050 int snaps_id_ofs = sizeof(*header);
3051 int names_ofs = snaps_id_ofs + sizeof(rbd_obj_snap_ondisk) * header->snap_count;
3052 const char *snap_names = ((char *)header) + names_ofs;
3053 const char *orig_names = snap_names;
3054 const char *end = snap_names + header->snap_names_len;
3055 bufferlist::iterator iter = in->begin();
3056 unsigned i;
3057 bool found = false;
3058
3059 try {
3060 ::decode(src_snap_id, iter);
3061 ::decode(dst, iter);
3062 } catch (const buffer::error &err) {
3063 return -EINVAL;
3064 }
3065 dst_snap_name = dst.c_str();
3066
3067 const char *cur_snap_name;
3068 for (cur_snap_name = snap_names; cur_snap_name < end;
3069 cur_snap_name += strlen(cur_snap_name) + 1) {
3070 if (strcmp(cur_snap_name, dst_snap_name) == 0)
3071 return -EEXIST;
3072 }
3073 if (cur_snap_name > end)
3074 return -EIO;
3075 for (i = 0; i < header->snap_count; i++) {
3076 if (src_snap_id == header->snaps[i].id) {
3077 found = true;
3078 break;
3079 }
3080 snap_names += strlen(snap_names) + 1;
3081 }
3082 if (!found) {
3083 CLS_ERR("couldn't find snap %llu\n", (unsigned long long)src_snap_id.val);
3084 return -ENOENT;
3085 }
3086
3087 CLS_LOG(20, "rename snap with snap id %llu to dest name %s", (unsigned long long)src_snap_id.val, dst_snap_name);
3088 header->snap_names_len = header->snap_names_len - strlen(snap_names) + dst.length();
3089
3090 bufferptr new_names_bp(header->snap_names_len);
3091 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
3092
3093 if (header->snap_count) {
3094 int names_len = 0;
3095 CLS_LOG(20, "i=%u\n", i);
3096 if (i > 0) {
3097 names_len = snap_names - orig_names;
3098 memcpy(new_names_bp.c_str(), orig_names, names_len);
3099 }
3100 strcpy(new_names_bp.c_str() + names_len, dst_snap_name);
3101 names_len += strlen(dst_snap_name) + 1;
3102 snap_names += strlen(snap_names) + 1;
3103 if (i < header->snap_count) {
3104 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3105 }
3106 memcpy(new_snaps_bp.c_str(), header->snaps, sizeof(header->snaps[0]) * header->snap_count);
3107 }
3108
3109 memcpy(header_bp.c_str(), header, sizeof(*header));
3110 newbl.push_back(header_bp);
3111 newbl.push_back(new_snaps_bp);
3112 newbl.push_back(new_names_bp);
3113
3114 rc = cls_cxx_write_full(hctx, &newbl);
3115 if (rc < 0)
3116 return rc;
3117 return 0;
3118}
3119
3120
3121namespace mirror {
3122
3123static const std::string UUID("mirror_uuid");
3124static const std::string MODE("mirror_mode");
3125static const std::string PEER_KEY_PREFIX("mirror_peer_");
3126static const std::string IMAGE_KEY_PREFIX("image_");
3127static const std::string GLOBAL_KEY_PREFIX("global_");
3128static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
3129static const std::string INSTANCE_KEY_PREFIX("instance_");
3130
3131std::string peer_key(const std::string &uuid) {
3132 return PEER_KEY_PREFIX + uuid;
3133}
3134
3135std::string image_key(const string &image_id) {
3136 return IMAGE_KEY_PREFIX + image_id;
3137}
3138
3139std::string global_key(const string &global_id) {
3140 return GLOBAL_KEY_PREFIX + global_id;
3141}
3142
3143std::string status_global_key(const string &global_id) {
3144 return STATUS_GLOBAL_KEY_PREFIX + global_id;
3145}
3146
3147std::string instance_key(const string &instance_id) {
3148 return INSTANCE_KEY_PREFIX + instance_id;
3149}
3150
3151int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
3152 bufferlist mirror_uuid_bl;
3153 int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
3154 if (r < 0) {
3155 if (r != -ENOENT) {
3156 CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
3157 }
3158 return r;
3159 }
3160
3161 *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
3162 return 0;
3163}
3164
3165int read_peers(cls_method_context_t hctx,
3166 std::vector<cls::rbd::MirrorPeer> *peers) {
3167 std::string last_read = PEER_KEY_PREFIX;
3168 int max_read = RBD_MAX_KEYS_READ;
3169 int r = max_read;
3170 while (r == max_read) {
3171 std::map<std::string, bufferlist> vals;
3172 r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
3173 max_read, &vals);
3174 if (r < 0) {
3175 CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
3176 return r;
3177 }
3178
3179 for (auto &it : vals) {
3180 try {
3181 bufferlist::iterator bl_it = it.second.begin();
3182 cls::rbd::MirrorPeer peer;
3183 ::decode(peer, bl_it);
3184 peers->push_back(peer);
3185 } catch (const buffer::error &err) {
3186 CLS_ERR("could not decode peer '%s'", it.first.c_str());
3187 return -EIO;
3188 }
3189 }
3190
3191 if (!vals.empty()) {
3192 last_read = vals.rbegin()->first;
3193 }
3194 }
3195 return 0;
3196}
3197
3198int read_peer(cls_method_context_t hctx, const std::string &id,
3199 cls::rbd::MirrorPeer *peer) {
3200 bufferlist bl;
3201 int r = cls_cxx_map_get_val(hctx, peer_key(id), &bl);
3202 if (r < 0) {
3203 CLS_ERR("error reading peer '%s': %s", id.c_str(),
3204 cpp_strerror(r).c_str());
3205 return r;
3206 }
3207
3208 try {
3209 bufferlist::iterator bl_it = bl.begin();
3210 ::decode(*peer, bl_it);
3211 } catch (const buffer::error &err) {
3212 CLS_ERR("could not decode peer '%s'", id.c_str());
3213 return -EIO;
3214 }
3215 return 0;
3216}
3217
3218int write_peer(cls_method_context_t hctx, const std::string &id,
3219 const cls::rbd::MirrorPeer &peer) {
3220 bufferlist bl;
3221 ::encode(peer, bl);
3222
3223 int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
3224 if (r < 0) {
3225 CLS_ERR("error writing peer '%s': %s", id.c_str(),
3226 cpp_strerror(r).c_str());
3227 return r;
3228 }
3229 return 0;
3230}
3231
3232int image_get(cls_method_context_t hctx, const string &image_id,
3233 cls::rbd::MirrorImage *mirror_image) {
3234 bufferlist bl;
3235 int r = cls_cxx_map_get_val(hctx, image_key(image_id), &bl);
3236 if (r < 0) {
3237 if (r != -ENOENT) {
3238 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3239 cpp_strerror(r).c_str());
3240 }
3241 return r;
3242 }
3243
3244 try {
3245 bufferlist::iterator it = bl.begin();
3246 ::decode(*mirror_image, it);
3247 } catch (const buffer::error &err) {
3248 CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
3249 return -EIO;
3250 }
3251
3252 return 0;
3253}
3254
3255int image_set(cls_method_context_t hctx, const string &image_id,
3256 const cls::rbd::MirrorImage &mirror_image) {
3257 bufferlist bl;
3258 ::encode(mirror_image, bl);
3259
3260 cls::rbd::MirrorImage existing_mirror_image;
3261 int r = image_get(hctx, image_id, &existing_mirror_image);
3262 if (r == -ENOENT) {
3263 // make sure global id doesn't already exist
3264 std::string global_id_key = global_key(mirror_image.global_image_id);
3265 std::string image_id;
3266 r = read_key(hctx, global_id_key, &image_id);
3267 if (r >= 0) {
3268 return -EEXIST;
3269 } else if (r != -ENOENT) {
3270 CLS_ERR("error reading global image id: '%s': '%s'", image_id.c_str(),
3271 cpp_strerror(r).c_str());
3272 return r;
3273 }
3274
3275 // make sure this was not a race for disabling
3276 if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3277 CLS_ERR("image '%s' is already disabled", image_id.c_str());
3278 return r;
3279 }
3280 } else if (r < 0) {
3281 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3282 cpp_strerror(r).c_str());
3283 return r;
3284 } else if (existing_mirror_image.global_image_id !=
3285 mirror_image.global_image_id) {
3286 // cannot change the global id
3287 return -EINVAL;
3288 }
3289
3290 r = cls_cxx_map_set_val(hctx, image_key(image_id), &bl);
3291 if (r < 0) {
3292 CLS_ERR("error adding mirrored image '%s': %s", image_id.c_str(),
3293 cpp_strerror(r).c_str());
3294 return r;
3295 }
3296
3297 bufferlist image_id_bl;
3298 ::encode(image_id, image_id_bl);
3299 r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
3300 &image_id_bl);
3301 if (r < 0) {
3302 CLS_ERR("error adding global id for image '%s': %s", image_id.c_str(),
3303 cpp_strerror(r).c_str());
3304 return r;
3305 }
3306 return 0;
3307}
3308
3309int image_remove(cls_method_context_t hctx, const string &image_id) {
3310 bufferlist bl;
3311 cls::rbd::MirrorImage mirror_image;
3312 int r = image_get(hctx, image_id, &mirror_image);
3313 if (r < 0) {
3314 if (r != -ENOENT) {
3315 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3316 cpp_strerror(r).c_str());
3317 }
3318 return r;
3319 }
3320
3321 if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3322 return -EBUSY;
3323 }
3324
3325 r = cls_cxx_map_remove_key(hctx, image_key(image_id));
3326 if (r < 0) {
3327 CLS_ERR("error removing mirrored image '%s': %s", image_id.c_str(),
3328 cpp_strerror(r).c_str());
3329 return r;
3330 }
3331
3332 r = cls_cxx_map_remove_key(hctx, global_key(mirror_image.global_image_id));
3333 if (r < 0 && r != -ENOENT) {
3334 CLS_ERR("error removing global id for image '%s': %s", image_id.c_str(),
3335 cpp_strerror(r).c_str());
3336 return r;
3337 }
3338
3339 r = cls_cxx_map_remove_key(hctx,
3340 status_global_key(mirror_image.global_image_id));
3341 if (r < 0 && r != -ENOENT) {
3342 CLS_ERR("error removing global status for image '%s': %s", image_id.c_str(),
3343 cpp_strerror(r).c_str());
3344 return r;
3345 }
3346
3347 return 0;
3348}
3349
3350struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
3351 entity_inst_t origin;
3352
3353 MirrorImageStatusOnDisk() {
3354 }
3355 MirrorImageStatusOnDisk(const cls::rbd::MirrorImageStatus &status) :
3356 cls::rbd::MirrorImageStatus(status) {
3357 }
3358
3359 void encode_meta(bufferlist &bl, uint64_t features) const {
3360 ENCODE_START(1, 1, bl);
3361 ::encode(origin, bl, features);
3362 ENCODE_FINISH(bl);
3363 }
3364
3365 void encode(bufferlist &bl, uint64_t features) const {
3366 encode_meta(bl, features);
3367 cls::rbd::MirrorImageStatus::encode(bl);
3368 }
3369
3370 void decode_meta(bufferlist::iterator &it) {
3371 DECODE_START(1, it);
3372 ::decode(origin, it);
3373 DECODE_FINISH(it);
3374 }
3375
3376 void decode(bufferlist::iterator &it) {
3377 decode_meta(it);
3378 cls::rbd::MirrorImageStatus::decode(it);
3379 }
3380};
3381WRITE_CLASS_ENCODER_FEATURES(MirrorImageStatusOnDisk)
3382
3383int image_status_set(cls_method_context_t hctx, const string &global_image_id,
3384 const cls::rbd::MirrorImageStatus &status) {
3385 MirrorImageStatusOnDisk ondisk_status(status);
3386 ondisk_status.up = false;
3387 ondisk_status.last_update = ceph_clock_now();
3388
3389 int r = cls_get_request_origin(hctx, &ondisk_status.origin);
3390 assert(r == 0);
3391
3392 bufferlist bl;
3393 encode(ondisk_status, bl, cls_get_features(hctx));
3394
3395 r = cls_cxx_map_set_val(hctx, status_global_key(global_image_id), &bl);
3396 if (r < 0) {
3397 CLS_ERR("error setting status for mirrored image, global id '%s': %s",
3398 global_image_id.c_str(), cpp_strerror(r).c_str());
3399 return r;
3400 }
3401 return 0;
3402}
3403
3404int image_status_remove(cls_method_context_t hctx,
3405 const string &global_image_id) {
3406
3407 int r = cls_cxx_map_remove_key(hctx, status_global_key(global_image_id));
3408 if (r < 0) {
3409 CLS_ERR("error removing status for mirrored image, global id '%s': %s",
3410 global_image_id.c_str(), cpp_strerror(r).c_str());
3411 return r;
3412 }
3413 return 0;
3414}
3415
3416int image_status_get(cls_method_context_t hctx, const string &global_image_id,
3417 cls::rbd::MirrorImageStatus *status) {
3418
3419 bufferlist bl;
3420 int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
3421 if (r < 0) {
3422 if (r != -ENOENT) {
3423 CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
3424 global_image_id.c_str(), cpp_strerror(r).c_str());
3425 }
3426 return r;
3427 }
3428
3429 MirrorImageStatusOnDisk ondisk_status;
3430 try {
3431 bufferlist::iterator it = bl.begin();
3432 decode(ondisk_status, it);
3433 } catch (const buffer::error &err) {
3434 CLS_ERR("could not decode status for mirrored image, global id '%s'",
3435 global_image_id.c_str());
3436 return -EIO;
3437 }
3438
3439 obj_list_watch_response_t watchers;
3440 r = cls_cxx_list_watchers(hctx, &watchers);
3441 if (r < 0 && r != -ENOENT) {
3442 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3443 return r;
3444 }
3445
3446 *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
3447 status->up = false;
3448 for (auto &w : watchers.entries) {
3449 if (w.name == ondisk_status.origin.name &&
3450 w.addr == ondisk_status.origin.addr) {
3451 status->up = true;
3452 break;
3453 }
3454 }
3455
3456 return 0;
3457}
3458
3459int image_status_list(cls_method_context_t hctx,
3460 const std::string &start_after, uint64_t max_return,
3461 map<std::string, cls::rbd::MirrorImage> *mirror_images,
3462 map<std::string, cls::rbd::MirrorImageStatus> *mirror_statuses) {
3463 std::string last_read = image_key(start_after);
3464 int max_read = RBD_MAX_KEYS_READ;
3465 int r = max_read;
3466
3467 while (r == max_read && mirror_images->size() < max_return) {
3468 std::map<std::string, bufferlist> vals;
3469 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
3470 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read,
3471 &vals);
3472 if (r < 0) {
3473 CLS_ERR("error reading mirror image directory by name: %s",
3474 cpp_strerror(r).c_str());
3475 return r;
3476 }
3477
3478 for (auto it = vals.begin(); it != vals.end() &&
3479 mirror_images->size() < max_return; ++it) {
3480 const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
3481 cls::rbd::MirrorImage mirror_image;
3482 bufferlist::iterator iter = it->second.begin();
3483 try {
3484 ::decode(mirror_image, iter);
3485 } catch (const buffer::error &err) {
3486 CLS_ERR("could not decode mirror image payload of image '%s'",
3487 image_id.c_str());
3488 return -EIO;
3489 }
3490
3491 (*mirror_images)[image_id] = mirror_image;
3492
3493 cls::rbd::MirrorImageStatus status;
3494 int r1 = image_status_get(hctx, mirror_image.global_image_id, &status);
3495 if (r1 < 0) {
3496 continue;
3497 }
3498
3499 (*mirror_statuses)[image_id] = status;
3500 }
3501 if (!vals.empty()) {
3502 last_read = image_key(mirror_images->rbegin()->first);
3503 }
3504 }
3505
3506 return 0;
3507}
3508
3509int image_status_get_summary(cls_method_context_t hctx,
3510 std::map<cls::rbd::MirrorImageStatusState, int> *states) {
3511 obj_list_watch_response_t watchers_;
3512 int r = cls_cxx_list_watchers(hctx, &watchers_);
3513 if (r < 0) {
3514 if (r != -ENOENT) {
3515 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3516 }
3517 return r;
3518 }
3519
3520 set<entity_inst_t> watchers;
3521 for (auto &w : watchers_.entries) {
3522 watchers.insert(entity_inst_t(w.name, w.addr));
3523 }
3524
3525 states->clear();
3526
3527 string last_read = IMAGE_KEY_PREFIX;
3528 int max_read = RBD_MAX_KEYS_READ;
3529 r = max_read;
3530 while (r == max_read) {
3531 map<string, bufferlist> vals;
3532 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
3533 max_read, &vals);
3534 if (r < 0) {
3535 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3536 return r;
3537 }
3538
3539 for (auto &list_it : vals) {
3540 const string &key = list_it.first;
3541
3542 if (0 != key.compare(0, IMAGE_KEY_PREFIX.size(), IMAGE_KEY_PREFIX)) {
3543 break;
3544 }
3545
3546 cls::rbd::MirrorImage mirror_image;
3547 bufferlist::iterator iter = list_it.second.begin();
3548 try {
3549 ::decode(mirror_image, iter);
3550 } catch (const buffer::error &err) {
3551 CLS_ERR("could not decode mirror image payload for key '%s'",
3552 key.c_str());
3553 return -EIO;
3554 }
3555
3556 cls::rbd::MirrorImageStatus status;
3557 image_status_get(hctx, mirror_image.global_image_id, &status);
3558
3559 cls::rbd::MirrorImageStatusState state = status.up ? status.state :
3560 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
3561 (*states)[state]++;
3562 }
3563
3564 if (!vals.empty()) {
3565 last_read = vals.rbegin()->first;
3566 }
3567 }
3568
3569 return 0;
3570}
3571
3572int image_status_remove_down(cls_method_context_t hctx) {
3573 obj_list_watch_response_t watchers_;
3574 int r = cls_cxx_list_watchers(hctx, &watchers_);
3575 if (r < 0) {
3576 if (r != -ENOENT) {
3577 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3578 }
3579 return r;
3580 }
3581
3582 set<entity_inst_t> watchers;
3583 for (auto &w : watchers_.entries) {
3584 watchers.insert(entity_inst_t(w.name, w.addr));
3585 }
3586
3587 string last_read = STATUS_GLOBAL_KEY_PREFIX;
3588 int max_read = RBD_MAX_KEYS_READ;
3589 r = max_read;
3590 while (r == max_read) {
3591 map<string, bufferlist> vals;
3592 r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
3593 max_read, &vals);
3594 if (r < 0) {
3595 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3596 return r;
3597 }
3598
3599 for (auto &list_it : vals) {
3600 const string &key = list_it.first;
3601
3602 if (0 != key.compare(0, STATUS_GLOBAL_KEY_PREFIX.size(),
3603 STATUS_GLOBAL_KEY_PREFIX)) {
3604 break;
3605 }
3606
3607 MirrorImageStatusOnDisk status;
3608 try {
3609 bufferlist::iterator it = list_it.second.begin();
3610 status.decode_meta(it);
3611 } catch (const buffer::error &err) {
3612 CLS_ERR("could not decode status metadata for mirrored image '%s'",
3613 key.c_str());
3614 return -EIO;
3615 }
3616
3617 if (watchers.find(status.origin) == watchers.end()) {
3618 CLS_LOG(20, "removing stale status object for key %s",
3619 key.c_str());
3620 int r1 = cls_cxx_map_remove_key(hctx, key);
3621 if (r1 < 0) {
3622 CLS_ERR("error removing stale status for key '%s': %s",
3623 key.c_str(), cpp_strerror(r1).c_str());
3624 return r1;
3625 }
3626 }
3627 }
3628
3629 if (!vals.empty()) {
3630 last_read = vals.rbegin()->first;
3631 }
3632 }
3633
3634 return 0;
3635}
3636
3637int instances_list(cls_method_context_t hctx,
3638 std::vector<std::string> *instance_ids) {
3639 std::string last_read = INSTANCE_KEY_PREFIX;
3640 int max_read = RBD_MAX_KEYS_READ;
3641 int r = max_read;
3642 while (r == max_read) {
3643 std::map<std::string, bufferlist> vals;
3644 r = cls_cxx_map_get_vals(hctx, last_read, INSTANCE_KEY_PREFIX.c_str(),
3645 max_read, &vals);
3646 if (r < 0) {
3647 if (r != -ENOENT) {
3648 CLS_ERR("error reading mirror instances: %s", cpp_strerror(r).c_str());
3649 }
3650 return r;
3651 }
3652
3653 for (auto &it : vals) {
3654 instance_ids->push_back(it.first.substr(INSTANCE_KEY_PREFIX.size()));
3655 }
3656
3657 if (!vals.empty()) {
3658 last_read = vals.rbegin()->first;
3659 }
3660 }
3661 return 0;
3662}
3663
3664int instances_add(cls_method_context_t hctx, const string &instance_id) {
3665 bufferlist bl;
3666
3667 int r = cls_cxx_map_set_val(hctx, instance_key(instance_id), &bl);
3668 if (r < 0) {
3669 CLS_ERR("error setting mirror instance %s: %s", instance_id.c_str(),
3670 cpp_strerror(r).c_str());
3671 return r;
3672 }
3673 return 0;
3674}
3675
3676int instances_remove(cls_method_context_t hctx, const string &instance_id) {
3677
3678 int r = cls_cxx_map_remove_key(hctx, instance_key(instance_id));
3679 if (r < 0) {
3680 CLS_ERR("error removing mirror instance %s: %s", instance_id.c_str(),
3681 cpp_strerror(r).c_str());
3682 return r;
3683 }
3684 return 0;
3685}
3686
3687} // namespace mirror
3688
3689/**
3690 * Input:
3691 * none
3692 *
3693 * Output:
3694 * @param uuid (std::string)
3695 * @returns 0 on success, negative error code on failure
3696 */
3697int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
3698 bufferlist *out) {
3699 std::string mirror_uuid;
3700 int r = mirror::uuid_get(hctx, &mirror_uuid);
3701 if (r < 0) {
3702 return r;
3703 }
3704
3705 ::encode(mirror_uuid, *out);
3706 return 0;
3707}
3708
3709/**
3710 * Input:
3711 * @param mirror_uuid (std::string)
3712 *
3713 * Output:
3714 * @returns 0 on success, negative error code on failure
3715 */
3716int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
3717 bufferlist *out) {
3718 std::string mirror_uuid;
3719 try {
3720 bufferlist::iterator bl_it = in->begin();
3721 ::decode(mirror_uuid, bl_it);
3722 } catch (const buffer::error &err) {
3723 return -EINVAL;
3724 }
3725
3726 if (mirror_uuid.empty()) {
3727 CLS_ERR("cannot set empty mirror uuid");
3728 return -EINVAL;
3729 }
3730
3731 uint32_t mirror_mode;
3732 int r = read_key(hctx, mirror::MODE, &mirror_mode);
3733 if (r < 0 && r != -ENOENT) {
3734 return r;
3735 } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
3736 CLS_ERR("cannot set mirror uuid while mirroring enabled");
3737 return -EINVAL;
3738 }
3739
3740 bufferlist mirror_uuid_bl;
3741 mirror_uuid_bl.append(mirror_uuid);
3742 r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
3743 if (r < 0) {
3744 CLS_ERR("failed to set mirror uuid");
3745 return r;
3746 }
3747 return 0;
3748}
3749
3750/**
3751 * Input:
3752 * none
3753 *
3754 * Output:
3755 * @param cls::rbd::MirrorMode (uint32_t)
3756 * @returns 0 on success, negative error code on failure
3757 */
3758int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
3759 bufferlist *out) {
3760 uint32_t mirror_mode_decode;
3761 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3762 if (r < 0) {
3763 return r;
3764 }
3765
3766 ::encode(mirror_mode_decode, *out);
3767 return 0;
3768}
3769
3770/**
3771 * Input:
3772 * @param mirror_mode (cls::rbd::MirrorMode) (uint32_t)
3773 *
3774 * Output:
3775 * @returns 0 on success, negative error code on failure
3776 */
3777int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
3778 bufferlist *out) {
3779 uint32_t mirror_mode_decode;
3780 try {
3781 bufferlist::iterator bl_it = in->begin();
3782 ::decode(mirror_mode_decode, bl_it);
3783 } catch (const buffer::error &err) {
3784 return -EINVAL;
3785 }
3786
3787 bool enabled;
3788 switch (static_cast<cls::rbd::MirrorMode>(mirror_mode_decode)) {
3789 case cls::rbd::MIRROR_MODE_DISABLED:
3790 enabled = false;
3791 break;
3792 case cls::rbd::MIRROR_MODE_IMAGE:
3793 case cls::rbd::MIRROR_MODE_POOL:
3794 enabled = true;
3795 break;
3796 default:
3797 CLS_ERR("invalid mirror mode: %d", mirror_mode_decode);
3798 return -EINVAL;
3799 }
3800
3801 int r;
3802 if (enabled) {
3803 std::string mirror_uuid;
3804 r = mirror::uuid_get(hctx, &mirror_uuid);
3805 if (r == -ENOENT) {
3806 return -EINVAL;
3807 } else if (r < 0) {
3808 return r;
3809 }
3810
3811 bufferlist bl;
3812 ::encode(mirror_mode_decode, bl);
3813
3814 r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
3815 if (r < 0) {
3816 CLS_ERR("error enabling mirroring: %s", cpp_strerror(r).c_str());
3817 return r;
3818 }
3819 } else {
3820 std::vector<cls::rbd::MirrorPeer> peers;
3821 r = mirror::read_peers(hctx, &peers);
3822 if (r < 0 && r != -ENOENT) {
3823 return r;
3824 }
3825
3826 if (!peers.empty()) {
3827 CLS_ERR("mirroring peers still registered");
3828 return -EBUSY;
3829 }
3830
3831 r = remove_key(hctx, mirror::MODE);
3832 if (r < 0) {
3833 return r;
3834 }
3835
3836 r = remove_key(hctx, mirror::UUID);
3837 if (r < 0) {
3838 return r;
3839 }
3840 }
3841 return 0;
3842}
3843
3844/**
3845 * Input:
3846 * none
3847 *
3848 * Output:
3849 * @param std::vector<cls::rbd::MirrorPeer>: collection of peers
3850 * @returns 0 on success, negative error code on failure
3851 */
3852int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
3853 bufferlist *out) {
3854 std::vector<cls::rbd::MirrorPeer> peers;
3855 int r = mirror::read_peers(hctx, &peers);
3856 if (r < 0 && r != -ENOENT) {
3857 return r;
3858 }
3859
3860 ::encode(peers, *out);
3861 return 0;
3862}
3863
3864/**
3865 * Input:
3866 * @param mirror_peer (cls::rbd::MirrorPeer)
3867 *
3868 * Output:
3869 * @returns 0 on success, negative error code on failure
3870 */
3871int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
3872 bufferlist *out) {
3873 cls::rbd::MirrorPeer mirror_peer;
3874 try {
3875 bufferlist::iterator it = in->begin();
3876 ::decode(mirror_peer, it);
3877 } catch (const buffer::error &err) {
3878 return -EINVAL;
3879 }
3880
3881 uint32_t mirror_mode_decode;
3882 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3883 if (r < 0 && r != -ENOENT) {
3884 return r;
3885 } else if (r == -ENOENT ||
3886 mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
3887 CLS_ERR("mirroring must be enabled on the pool");
3888 return -EINVAL;
3889 } else if (!mirror_peer.is_valid()) {
3890 CLS_ERR("mirror peer is not valid");
3891 return -EINVAL;
3892 }
3893
3894 std::string mirror_uuid;
3895 r = mirror::uuid_get(hctx, &mirror_uuid);
3896 if (r < 0) {
3897 CLS_ERR("error retrieving mirroring uuid: %s", cpp_strerror(r).c_str());
3898 return r;
3899 } else if (mirror_peer.uuid == mirror_uuid) {
3900 CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
3901 mirror_uuid.c_str());
3902 return -EINVAL;
3903 }
3904
3905 std::vector<cls::rbd::MirrorPeer> peers;
3906 r = mirror::read_peers(hctx, &peers);
3907 if (r < 0 && r != -ENOENT) {
3908 return r;
3909 }
3910
3911 for (auto const &peer : peers) {
3912 if (peer.uuid == mirror_peer.uuid) {
3913 CLS_ERR("peer uuid '%s' already exists",
3914 peer.uuid.c_str());
3915 return -ESTALE;
3916 } else if (peer.cluster_name == mirror_peer.cluster_name &&
3917 (peer.pool_id == -1 || mirror_peer.pool_id == -1 ||
3918 peer.pool_id == mirror_peer.pool_id)) {
3919 CLS_ERR("peer cluster name '%s' already exists",
3920 peer.cluster_name.c_str());
3921 return -EEXIST;
3922 }
3923 }
3924
3925 bufferlist bl;
3926 ::encode(mirror_peer, bl);
3927 r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
3928 &bl);
3929 if (r < 0) {
3930 CLS_ERR("error adding peer: %s", cpp_strerror(r).c_str());
3931 return r;
3932 }
3933 return 0;
3934}
3935
3936/**
3937 * Input:
3938 * @param uuid (std::string)
3939 *
3940 * Output:
3941 * @returns 0 on success, negative error code on failure
3942 */
3943int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
3944 bufferlist *out) {
3945 std::string uuid;
3946 try {
3947 bufferlist::iterator it = in->begin();
3948 ::decode(uuid, it);
3949 } catch (const buffer::error &err) {
3950 return -EINVAL;
3951 }
3952
3953 int r = cls_cxx_map_remove_key(hctx, mirror::peer_key(uuid));
3954 if (r < 0 && r != -ENOENT) {
3955 CLS_ERR("error removing peer: %s", cpp_strerror(r).c_str());
3956 return r;
3957 }
3958 return 0;
3959}
3960
3961/**
3962 * Input:
3963 * @param uuid (std::string)
3964 * @param client_name (std::string)
3965 *
3966 * Output:
3967 * @returns 0 on success, negative error code on failure
3968 */
3969int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
3970 bufferlist *out) {
3971 std::string uuid;
3972 std::string client_name;
3973 try {
3974 bufferlist::iterator it = in->begin();
3975 ::decode(uuid, it);
3976 ::decode(client_name, it);
3977 } catch (const buffer::error &err) {
3978 return -EINVAL;
3979 }
3980
3981 cls::rbd::MirrorPeer peer;
3982 int r = mirror::read_peer(hctx, uuid, &peer);
3983 if (r < 0) {
3984 return r;
3985 }
3986
3987 peer.client_name = client_name;
3988 r = mirror::write_peer(hctx, uuid, peer);
3989 if (r < 0) {
3990 return r;
3991 }
3992 return 0;
3993}
3994
3995/**
3996 * Input:
3997 * @param uuid (std::string)
3998 * @param cluster_name (std::string)
3999 *
4000 * Output:
4001 * @returns 0 on success, negative error code on failure
4002 */
4003int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
4004 bufferlist *out) {
4005 std::string uuid;
4006 std::string cluster_name;
4007 try {
4008 bufferlist::iterator it = in->begin();
4009 ::decode(uuid, it);
4010 ::decode(cluster_name, it);
4011 } catch (const buffer::error &err) {
4012 return -EINVAL;
4013 }
4014
4015 cls::rbd::MirrorPeer peer;
4016 int r = mirror::read_peer(hctx, uuid, &peer);
4017 if (r < 0) {
4018 return r;
4019 }
4020
4021 peer.cluster_name = cluster_name;
4022 r = mirror::write_peer(hctx, uuid, peer);
4023 if (r < 0) {
4024 return r;
4025 }
4026 return 0;
4027}
4028
4029/**
4030 * Input:
4031 * @param start_after which name to begin listing after
4032 * (use the empty string to start at the beginning)
4033 * @param max_return the maximum number of names to list
4034 *
4035 * Output:
4036 * @param std::map<std::string, std::string>: local id to global id map
4037 * @returns 0 on success, negative error code on failure
4038 */
4039int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
4040 bufferlist *out) {
4041 std::string start_after;
4042 uint64_t max_return;
4043 try {
4044 bufferlist::iterator iter = in->begin();
4045 ::decode(start_after, iter);
4046 ::decode(max_return, iter);
4047 } catch (const buffer::error &err) {
4048 return -EINVAL;
4049 }
4050
4051 int max_read = RBD_MAX_KEYS_READ;
4052 int r = max_read;
4053 std::map<std::string, std::string> mirror_images;
4054 std::string last_read = mirror::image_key(start_after);
4055
4056 while (r == max_read && mirror_images.size() < max_return) {
4057 std::map<std::string, bufferlist> vals;
4058 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4059 r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
4060 max_read, &vals);
4061 if (r < 0) {
4062 CLS_ERR("error reading mirror image directory by name: %s",
4063 cpp_strerror(r).c_str());
4064 return r;
4065 }
4066
4067 for (auto it = vals.begin(); it != vals.end(); ++it) {
4068 const std::string &image_id =
4069 it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
4070 cls::rbd::MirrorImage mirror_image;
4071 bufferlist::iterator iter = it->second.begin();
4072 try {
4073 ::decode(mirror_image, iter);
4074 } catch (const buffer::error &err) {
4075 CLS_ERR("could not decode mirror image payload of image '%s'",
4076 image_id.c_str());
4077 return -EIO;
4078 }
4079
4080 mirror_images[image_id] = mirror_image.global_image_id;
4081 if (mirror_images.size() >= max_return) {
4082 break;
4083 }
4084 }
4085 if (!vals.empty()) {
4086 last_read = mirror::image_key(mirror_images.rbegin()->first);
4087 }
4088 }
4089
4090 ::encode(mirror_images, *out);
4091 return 0;
4092}
4093
4094/**
4095 * Input:
4096 * @param global_id (std::string)
4097 *
4098 * Output:
4099 * @param std::string - image id
4100 * @returns 0 on success, negative error code on failure
4101 */
4102int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
4103 bufferlist *out) {
4104 std::string global_id;
4105 try {
4106 bufferlist::iterator it = in->begin();
4107 ::decode(global_id, it);
4108 } catch (const buffer::error &err) {
4109 return -EINVAL;
4110 }
4111
4112 std::string image_id;
4113 int r = read_key(hctx, mirror::global_key(global_id), &image_id);
4114 if (r < 0) {
4115 CLS_ERR("error retrieving image id for global id '%s': %s",
4116 global_id.c_str(), cpp_strerror(r).c_str());
4117 return r;
4118 }
4119
4120 ::encode(image_id, *out);
4121 return 0;
4122}
4123
4124/**
4125 * Input:
4126 * @param image_id (std::string)
4127 *
4128 * Output:
4129 * @param cls::rbd::MirrorImage - metadata associated with the image_id
4130 * @returns 0 on success, negative error code on failure
4131 */
4132int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
4133 bufferlist *out) {
4134 string image_id;
4135 try {
4136 bufferlist::iterator it = in->begin();
4137 ::decode(image_id, it);
4138 } catch (const buffer::error &err) {
4139 return -EINVAL;
4140 }
4141
4142 cls::rbd::MirrorImage mirror_image;
4143 int r = mirror::image_get(hctx, image_id, &mirror_image);
4144 if (r < 0) {
4145 return r;
4146 }
4147
4148 ::encode(mirror_image, *out);
4149 return 0;
4150}
4151
4152/**
4153 * Input:
4154 * @param image_id (std::string)
4155 * @param mirror_image (cls::rbd::MirrorImage)
4156 *
4157 * Output:
4158 * @returns 0 on success, negative error code on failure
4159 * @returns -EEXIST if there's an existing image_id with a different global_image_id
4160 */
4161int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
4162 bufferlist *out) {
4163 string image_id;
4164 cls::rbd::MirrorImage mirror_image;
4165 try {
4166 bufferlist::iterator it = in->begin();
4167 ::decode(image_id, it);
4168 ::decode(mirror_image, it);
4169 } catch (const buffer::error &err) {
4170 return -EINVAL;
4171 }
4172
4173 int r = mirror::image_set(hctx, image_id, mirror_image);
4174 if (r < 0) {
4175 return r;
4176 }
4177 return 0;
4178}
4179
4180/**
4181 * Input:
4182 * @param image_id (std::string)
4183 *
4184 * Output:
4185 * @returns 0 on success, negative error code on failure
4186 */
4187int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
4188 bufferlist *out) {
4189 string image_id;
4190 try {
4191 bufferlist::iterator it = in->begin();
4192 ::decode(image_id, it);
4193 } catch (const buffer::error &err) {
4194 return -EINVAL;
4195 }
4196
4197 int r = mirror::image_remove(hctx, image_id);
4198 if (r < 0) {
4199 return r;
4200 }
4201 return 0;
4202}
4203
4204/**
4205 * Input:
4206 * @param global_image_id (std::string)
4207 * @param status (cls::rbd::MirrorImageStatus)
4208 *
4209 * Output:
4210 * @returns 0 on success, negative error code on failure
4211 */
4212int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
4213 bufferlist *out) {
4214 string global_image_id;
4215 cls::rbd::MirrorImageStatus status;
4216 try {
4217 bufferlist::iterator it = in->begin();
4218 ::decode(global_image_id, it);
4219 ::decode(status, it);
4220 } catch (const buffer::error &err) {
4221 return -EINVAL;
4222 }
4223
4224 int r = mirror::image_status_set(hctx, global_image_id, status);
4225 if (r < 0) {
4226 return r;
4227 }
4228 return 0;
4229}
4230
4231/**
4232 * Input:
4233 * @param global_image_id (std::string)
4234 *
4235 * Output:
4236 * @returns 0 on success, negative error code on failure
4237 */
4238int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
4239 bufferlist *out) {
4240 string global_image_id;
4241 try {
4242 bufferlist::iterator it = in->begin();
4243 ::decode(global_image_id, it);
4244 } catch (const buffer::error &err) {
4245 return -EINVAL;
4246 }
4247
4248 int r = mirror::image_status_remove(hctx, global_image_id);
4249 if (r < 0) {
4250 return r;
4251 }
4252 return 0;
4253}
4254
4255/**
4256 * Input:
4257 * @param global_image_id (std::string)
4258 *
4259 * Output:
4260 * @param cls::rbd::MirrorImageStatus - metadata associated with the global_image_id
4261 * @returns 0 on success, negative error code on failure
4262 */
4263int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
4264 bufferlist *out) {
4265 string global_image_id;
4266 try {
4267 bufferlist::iterator it = in->begin();
4268 ::decode(global_image_id, it);
4269 } catch (const buffer::error &err) {
4270 return -EINVAL;
4271 }
4272
4273 cls::rbd::MirrorImageStatus status;
4274 int r = mirror::image_status_get(hctx, global_image_id, &status);
4275 if (r < 0) {
4276 return r;
4277 }
4278
4279 ::encode(status, *out);
4280 return 0;
4281}
4282
4283/**
4284 * Input:
4285 * @param start_after which name to begin listing after
4286 * (use the empty string to start at the beginning)
4287 * @param max_return the maximum number of names to list
4288 *
4289 * Output:
4290 * @param std::map<std::string, cls::rbd::MirrorImage>: image id to image map
4291 * @param std::map<std::string, cls::rbd::MirrorImageStatus>: image it to status map
4292 * @returns 0 on success, negative error code on failure
4293 */
4294int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
4295 bufferlist *out) {
4296 std::string start_after;
4297 uint64_t max_return;
4298 try {
4299 bufferlist::iterator iter = in->begin();
4300 ::decode(start_after, iter);
4301 ::decode(max_return, iter);
4302 } catch (const buffer::error &err) {
4303 return -EINVAL;
4304 }
4305
4306 map<std::string, cls::rbd::MirrorImage> images;
4307 map<std::string, cls::rbd::MirrorImageStatus> statuses;
4308 int r = mirror::image_status_list(hctx, start_after, max_return, &images,
4309 &statuses);
4310 if (r < 0) {
4311 return r;
4312 }
4313
4314 ::encode(images, *out);
4315 ::encode(statuses, *out);
4316 return 0;
4317}
4318
4319/**
4320 * Input:
4321 * none
4322 *
4323 * Output:
4324 * @param std::map<cls::rbd::MirrorImageStatusState, int>: states counts
4325 * @returns 0 on success, negative error code on failure
4326 */
4327int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
4328 bufferlist *out) {
4329 std::map<cls::rbd::MirrorImageStatusState, int> states;
4330
4331 int r = mirror::image_status_get_summary(hctx, &states);
4332 if (r < 0) {
4333 return r;
4334 }
4335
4336 ::encode(states, *out);
4337 return 0;
4338}
4339
4340/**
4341 * Input:
4342 * none
4343 *
4344 * Output:
4345 * @returns 0 on success, negative error code on failure
4346 */
4347int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
4348 bufferlist *out) {
4349 int r = mirror::image_status_remove_down(hctx);
4350 if (r < 0) {
4351 return r;
4352 }
4353 return 0;
4354}
4355
4356/**
4357 * Input:
4358 * none
4359 *
4360 * Output:
4361 * @param std::vector<std::string>: instance ids
4362 * @returns 0 on success, negative error code on failure
4363 */
4364int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
4365 bufferlist *out) {
4366 std::vector<std::string> instance_ids;
4367
4368 int r = mirror::instances_list(hctx, &instance_ids);
4369 if (r < 0) {
4370 return r;
4371 }
4372
4373 ::encode(instance_ids, *out);
4374 return 0;
4375}
4376
4377/**
4378 * Input:
4379 * @param instance_id (std::string)
4380 *
4381 * Output:
4382 * @returns 0 on success, negative error code on failure
4383 */
4384int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
4385 bufferlist *out) {
4386 std::string instance_id;
4387 try {
4388 bufferlist::iterator iter = in->begin();
4389 ::decode(instance_id, iter);
4390 } catch (const buffer::error &err) {
4391 return -EINVAL;
4392 }
4393
4394 int r = mirror::instances_add(hctx, instance_id);
4395 if (r < 0) {
4396 return r;
4397 }
4398 return 0;
4399}
4400
4401/**
4402 * Input:
4403 * @param instance_id (std::string)
4404 *
4405 * Output:
4406 * @returns 0 on success, negative error code on failure
4407 */
4408int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
4409 bufferlist *out) {
4410 std::string instance_id;
4411 try {
4412 bufferlist::iterator iter = in->begin();
4413 ::decode(instance_id, iter);
4414 } catch (const buffer::error &err) {
4415 return -EINVAL;
4416 }
4417
4418 int r = mirror::instances_remove(hctx, instance_id);
4419 if (r < 0) {
4420 return r;
4421 }
4422 return 0;
4423}
4424
4425/**
4426 * Initialize the header with basic metadata.
4427 * Everything is stored as key/value pairs as omaps in the header object.
4428 *
4429 * Input:
4430 * none
4431 *
4432 * Output:
4433 * @return 0 on success, negative error code on failure
4434 */
4435int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4436{
4437 bufferlist snap_seqbl;
4438 uint64_t snap_seq = 0;
4439 ::encode(snap_seq, snap_seqbl);
4440 int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
4441 if (r < 0)
4442 return r;
4443
4444 return 0;
4445}
4446
4447/**
4448 * List consistency groups from the directory.
4449 *
4450 * Input:
4451 * @param start_after (std::string)
4452 * @param max_return (int64_t)
4453 *
4454 * Output:
4455 * @param map of consistency groups (name, id)
4456 * @return 0 on success, negative error code on failure
4457 */
4458int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4459{
4460 string start_after;
4461 uint64_t max_return;
4462
4463 try {
4464 bufferlist::iterator iter = in->begin();
4465 ::decode(start_after, iter);
4466 ::decode(max_return, iter);
4467 } catch (const buffer::error &err) {
4468 return -EINVAL;
4469 }
4470
4471 int max_read = RBD_MAX_KEYS_READ;
4472 int r = max_read;
4473 map<string, string> groups;
4474 string last_read = dir_key_for_name(start_after);
4475
4476 while (r == max_read && groups.size() < max_return) {
4477 map<string, bufferlist> vals;
4478 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4479 r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
4480 max_read, &vals);
4481 if (r < 0) {
4482 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
4483 return r;
4484 }
4485
4486 for (pair<string, bufferlist> val: vals) {
4487 string id;
4488 bufferlist::iterator iter = val.second.begin();
4489 try {
4490 ::decode(id, iter);
4491 } catch (const buffer::error &err) {
4492 CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
4493 return -EIO;
4494 }
4495 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
4496 groups[dir_name_from_key(val.first)] = id;
4497 if (groups.size() >= max_return)
4498 break;
4499 }
4500 if (!vals.empty()) {
4501 last_read = dir_key_for_name(groups.rbegin()->first);
4502 }
4503 }
4504
4505 ::encode(groups, *out);
4506
4507 return 0;
4508}
4509
4510/**
4511 * Add a consistency group to the directory.
4512 *
4513 * Input:
4514 * @param name (std::string)
4515 * @param id (std::string)
4516 *
4517 * Output:
4518 * @return 0 on success, negative error code on failure
4519 */
4520int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4521{
4522 int r = cls_cxx_create(hctx, false);
4523
4524 if (r < 0) {
4525 CLS_ERR("could not create consistency group directory: %s",
4526 cpp_strerror(r).c_str());
4527 return r;
4528 }
4529
4530 string name, id;
4531 try {
4532 bufferlist::iterator iter = in->begin();
4533 ::decode(name, iter);
4534 ::decode(id, iter);
4535 } catch (const buffer::error &err) {
4536 return -EINVAL;
4537 }
4538
4539 if (!name.size() || !is_valid_id(id)) {
4540 CLS_ERR("invalid consistency group name '%s' or id '%s'",
4541 name.c_str(), id.c_str());
4542 return -EINVAL;
4543 }
4544
4545 CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
4546
4547 string tmp;
4548 string name_key = dir_key_for_name(name);
4549 string id_key = dir_key_for_id(id);
4550 r = read_key(hctx, name_key, &tmp);
4551 if (r != -ENOENT) {
4552 CLS_LOG(10, "name already exists");
4553 return -EEXIST;
4554 }
4555 r = read_key(hctx, id_key, &tmp);
4556 if (r != -ENOENT) {
4557 CLS_LOG(10, "id already exists");
4558 return -EBADF;
4559 }
4560 bufferlist id_bl, name_bl;
4561 ::encode(id, id_bl);
4562 ::encode(name, name_bl);
4563 map<string, bufferlist> omap_vals;
4564 omap_vals[name_key] = id_bl;
4565 omap_vals[id_key] = name_bl;
4566 return cls_cxx_map_set_vals(hctx, &omap_vals);
4567}
4568
4569/**
4570 * Remove a consistency group from the directory.
4571 *
4572 * Input:
4573 * @param name (std::string)
4574 * @param id (std::string)
4575 *
4576 * Output:
4577 * @return 0 on success, negative error code on failure
4578 */
4579int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4580{
4581 string name, id;
4582 try {
4583 bufferlist::iterator iter = in->begin();
4584 ::decode(name, iter);
4585 ::decode(id, iter);
4586 } catch (const buffer::error &err) {
4587 return -EINVAL;
4588 }
4589
4590 CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
4591
4592 string stored_name, stored_id;
4593 string name_key = dir_key_for_name(name);
4594 string id_key = dir_key_for_id(id);
4595
4596 int r = read_key(hctx, name_key, &stored_id);
4597 if (r < 0) {
4598 if (r != -ENOENT)
4599 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
4600 return r;
4601 }
4602 r = read_key(hctx, id_key, &stored_name);
4603 if (r < 0) {
4604 if (r != -ENOENT)
4605 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
4606 return r;
4607 }
4608
4609 // check if this op raced with a rename
4610 if (stored_name != name || stored_id != id) {
4611 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
4612 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
4613 return -ESTALE;
4614 }
4615
4616 r = cls_cxx_map_remove_key(hctx, name_key);
4617 if (r < 0) {
4618 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
4619 return r;
4620 }
4621
4622 r = cls_cxx_map_remove_key(hctx, id_key);
4623 if (r < 0) {
4624 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
4625 return r;
4626 }
4627
4628 return 0;
4629}
4630
4631/**
4632 * Set state of an image in the consistency group.
4633 *
4634 * Input:
4635 * @param image_status (cls::rbd::GroupImageStatus)
4636 *
4637 * Output:
4638 * @return 0 on success, negative error code on failure
4639 */
4640int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4641{
4642 CLS_LOG(20, "group_image_set");
4643
4644 cls::rbd::GroupImageStatus st;
4645 try {
4646 bufferlist::iterator iter = in->begin();
4647 ::decode(st, iter);
4648 } catch (const buffer::error &err) {
4649 return -EINVAL;
4650 }
4651
4652 string image_key = st.spec.image_key();
4653
4654 bufferlist image_val_bl;
4655 ::encode(st.state, image_val_bl);
4656 int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
4657 if (r < 0) {
4658 return r;
4659 }
4660
4661 return 0;
4662}
4663
4664/**
4665 * Remove reference to an image from the consistency group.
4666 *
4667 * Input:
4668 * @param spec (cls::rbd::GroupImageSpec)
4669 *
4670 * Output:
4671 * @return 0 on success, negative error code on failure
4672 */
4673int group_image_remove(cls_method_context_t hctx,
4674 bufferlist *in, bufferlist *out)
4675{
4676 CLS_LOG(20, "group_image_remove");
4677 cls::rbd::GroupImageSpec spec;
4678 try {
4679 bufferlist::iterator iter = in->begin();
4680 ::decode(spec, iter);
4681 } catch (const buffer::error &err) {
4682 return -EINVAL;
4683 }
4684
4685 string image_key = spec.image_key();
4686
4687 int r = cls_cxx_map_remove_key(hctx, image_key);
4688 if (r < 0) {
4689 CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
4690 return r;
4691 }
4692
4693 return 0;
4694}
4695
4696/*
4697 * List images in the consistency group.
4698 *
4699 * Input:
4700 * @param start_after which name to begin listing after
4701 * (use the empty string to start at the beginning)
4702 * @param max_return the maximum number of names to list
4703 *
4704 * Output:
4705 * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
4706 * @return 0 on success, negative error code on failure
4707 */
4708int group_image_list(cls_method_context_t hctx,
4709 bufferlist *in, bufferlist *out)
4710{
4711 CLS_LOG(20, "group_image_list");
4712 cls::rbd::GroupImageSpec start_after;
4713 uint64_t max_return;
4714 try {
4715 bufferlist::iterator iter = in->begin();
4716 ::decode(start_after, iter);
4717 ::decode(max_return, iter);
4718 } catch (const buffer::error &err) {
4719 return -EINVAL;
4720 }
4721
4722 int max_read = RBD_MAX_KEYS_READ;
4723 std::map<string, bufferlist> vals;
4724 string last_read = start_after.image_key();
4725 std::vector<cls::rbd::GroupImageStatus> res;
4726 int keys_read;
4727 do {
4728 keys_read = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
4729 max_read, &vals);
4730 if (keys_read < 0)
4731 return keys_read;
4732
4733 for (map<string, bufferlist>::iterator it = vals.begin();
4734 it != vals.end() && res.size() < max_return; ++it) {
4735
4736 bufferlist::iterator iter = it->second.begin();
4737 cls::rbd::GroupImageLinkState state;
4738 try {
4739 ::decode(state, iter);
4740 } catch (const buffer::error &err) {
4741 CLS_ERR("error decoding state for image: %s", it->first.c_str());
4742 return -EIO;
4743 }
4744 cls::rbd::GroupImageSpec spec;
4745 int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
4746 if (r < 0)
4747 return r;
4748
4749 CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
4750 spec.pool_id,
4751 (int)state);
4752 res.push_back(cls::rbd::GroupImageStatus(spec, state));
4753 }
4754 if (res.size() > 0) {
4755 last_read = res.rbegin()->spec.image_key();
4756 }
4757
4758 } while ((keys_read == RBD_MAX_KEYS_READ) && (res.size() < max_return));
4759 ::encode(res, *out);
4760
4761 return 0;
4762}
4763
4764/**
4765 * Reference the consistency group this image belongs to.
4766 *
4767 * Input:
4768 * @param group_id (std::string)
4769 * @param pool_id (int64_t)
4770 *
4771 * Output:
4772 * @return 0 on success, negative error code on failure
4773 */
4774int image_add_group(cls_method_context_t hctx,
4775 bufferlist *in, bufferlist *out)
4776{
4777 CLS_LOG(20, "image_add_group");
4778 cls::rbd::GroupSpec new_group;
4779 try {
4780 bufferlist::iterator iter = in->begin();
4781 ::decode(new_group, iter);
4782 } catch (const buffer::error &err) {
4783 return -EINVAL;
4784 }
4785
4786 bufferlist existing_refbl;
4787
4788 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
4789 if (r == 0) {
4790 // If we are trying to link this image to the same group then return success.
4791 // If this image already belongs to another group then abort.
4792 cls::rbd::GroupSpec old_group;
4793 try {
4794 bufferlist::iterator iter = existing_refbl.begin();
4795 ::decode(old_group, iter);
4796 } catch (const buffer::error &err) {
4797 return -EINVAL;
4798 }
4799
4800 if ((old_group.group_id != new_group.group_id)
4801 || (old_group.pool_id != new_group.pool_id)) {
4802 return -EEXIST;
4803 } else {
4804 return 0; // In this case the values are already correct
4805 }
4806 } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
4807 return r;
4808 }
4809
4810 bufferlist refbl;
4811 ::encode(new_group, refbl);
4812 r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
4813
4814 if (r < 0) {
4815 return r;
4816 }
4817
4818 return 0;
4819}
4820
4821/**
4822 * Remove image's pointer to the consistency group.
4823 *
4824 * Input:
4825 * @param cg_id (std::string)
4826 * @param pool_id (int64_t)
4827 *
4828 * Output:
4829 * @return 0 on success, negative error code on failure
4830 */
4831int image_remove_group(cls_method_context_t hctx,
4832 bufferlist *in,
4833 bufferlist *out)
4834{
4835 CLS_LOG(20, "image_remove_group");
4836 cls::rbd::GroupSpec spec;
4837 try {
4838 bufferlist::iterator iter = in->begin();
4839 ::decode(spec, iter);
4840 } catch (const buffer::error &err) {
4841 return -EINVAL;
4842 }
4843
4844 bufferlist refbl;
4845 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4846 if (r < 0) {
4847 return r;
4848 }
4849
4850 cls::rbd::GroupSpec ref_spec;
4851 bufferlist::iterator iter = refbl.begin();
4852 try {
4853 ::decode(ref_spec, iter);
4854 } catch (const buffer::error &err) {
4855 return -EINVAL;
4856 }
4857
4858 if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
4859 return -EBADF;
4860 }
4861
4862 r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
4863 if (r < 0) {
4864 return r;
4865 }
4866
4867 return 0;
4868}
4869
4870/**
4871 * Retrieve the id and pool of the consistency group this image belongs to.
4872 *
4873 * Input:
4874 * none
4875 *
4876 * Output:
4877 * @param GroupSpec
4878 * @return 0 on success, negative error code on failure
4879 */
4880int image_get_group(cls_method_context_t hctx,
4881 bufferlist *in, bufferlist *out)
4882{
4883 CLS_LOG(20, "image_get_group");
4884 bufferlist refbl;
4885 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4886 if (r < 0 && r != -ENOENT) {
4887 return r;
4888 }
4889
4890 cls::rbd::GroupSpec spec;
4891
4892 if (r != -ENOENT) {
4893 bufferlist::iterator iter = refbl.begin();
4894 try {
4895 ::decode(spec, iter);
4896 } catch (const buffer::error &err) {
4897 return -EINVAL;
4898 }
4899 }
4900
4901 ::encode(spec, *out);
4902 return 0;
4903}
4904
4905namespace trash {
4906
4907static const std::string IMAGE_KEY_PREFIX("id_");
4908
4909std::string image_key(const std::string &image_id) {
4910 return IMAGE_KEY_PREFIX + image_id;
4911}
4912
4913std::string image_id_from_key(const std::string &key) {
4914 return key.substr(IMAGE_KEY_PREFIX.size());
4915}
4916
4917} // namespace trash
4918
4919/**
4920 * Add an image entry to the rbd trash. Creates the trash object if
4921 * needed, and stores the trash spec information of the deleted image.
4922 *
4923 * Input:
4924 * @param id the id of the image
4925 * @param trash_spec the spec info of the deleted image
4926 *
4927 * Output:
4928 * @returns -EEXIST if the image id is already in the trash
4929 * @returns 0 on success, negative error code on failure
4930 */
4931int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4932{
4933 int r = cls_cxx_create(hctx, false);
4934 if (r < 0) {
4935 CLS_ERR("could not create trash: %s", cpp_strerror(r).c_str());
4936 return r;
4937 }
4938
4939 string id;
4940 cls::rbd::TrashImageSpec trash_spec;
4941 try {
4942 bufferlist::iterator iter = in->begin();
4943 ::decode(id, iter);
4944 ::decode(trash_spec, iter);
4945 } catch (const buffer::error &err) {
4946 return -EINVAL;
4947 }
4948
4949 if (!is_valid_id(id)) {
4950 CLS_ERR("trash_add: invalid id '%s'", id.c_str());
4951 return -EINVAL;
4952 }
4953
4954 CLS_LOG(20, "trash_add id=%s", id.c_str());
4955
4956 string key = trash::image_key(id);
4957 cls::rbd::TrashImageSpec tmp;
4958 r = read_key(hctx, key, &tmp);
4959 if (r < 0 && r != -ENOENT) {
4960 CLS_ERR("could not read key %s entry from trash: %s", key.c_str(),
4961 cpp_strerror(r).c_str());
4962 return r;
4963 } else if (r == 0) {
4964 CLS_LOG(10, "id already exists");
4965 return -EEXIST;
4966 }
4967
4968 map<string, bufferlist> omap_vals;
4969 ::encode(trash_spec, omap_vals[key]);
4970 return cls_cxx_map_set_vals(hctx, &omap_vals);
4971}
4972
4973/**
4974 * Removes an image entry from the rbd trash object.
4975 * image.
4976 *
4977 * Input:
4978 * @param id the id of the image
4979 *
4980 * Output:
4981 * @returns -ENOENT if the image id does not exist in the trash
4982 * @returns 0 on success, negative error code on failure
4983 */
4984int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4985{
4986 string id;
4987 try {
4988 bufferlist::iterator iter = in->begin();
4989 ::decode(id, iter);
4990 } catch (const buffer::error &err) {
4991 return -EINVAL;
4992 }
4993
4994 CLS_LOG(20, "trash_remove id=%s", id.c_str());
4995
4996 string key = trash::image_key(id);
4997 bufferlist tmp;
4998 int r = cls_cxx_map_get_val(hctx, key, &tmp);
4999 if (r < 0) {
5000 if (r != -ENOENT) {
5001 CLS_ERR("error reading entry key %s: %s", key.c_str(), cpp_strerror(r).c_str());
5002 }
5003 return r;
5004 }
5005
5006 r = cls_cxx_map_remove_key(hctx, key);
5007 if (r < 0) {
5008 CLS_ERR("error removing entry: %s", cpp_strerror(r).c_str());
5009 return r;
5010 }
5011
5012 return 0;
5013}
5014
5015/**
5016 * Returns the list of trash spec entries registered in the rbd_trash
5017 * object.
5018 *
5019 * Output:
5020 * @param data the map between image id and trash spec info
5021 *
5022 * @returns 0 on success, negative error code on failure
5023 */
5024int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5025{
5026 map<string, cls::rbd::TrashImageSpec> data;
5027 string last_read = trash::image_key("");
5028 int max_read = RBD_MAX_KEYS_READ;
5029
5030 CLS_LOG(20, "trash_get_images");
5031
5032 do {
5033 map<string, bufferlist> raw_data;
5034 int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
5035 max_read, &raw_data);
5036 if (r < 0) {
5037 CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
5038 return r;
5039 }
5040 if (raw_data.empty()) {
5041 break;
5042 }
5043
5044 map<string, bufferlist>::iterator it = raw_data.begin();
5045 for (; it != raw_data.end(); ++it) {
5046 ::decode(data[trash::image_id_from_key(it->first)], it->second);
5047 }
5048
5049 if (r < max_read) {
5050 break;
5051 }
5052
5053 last_read = raw_data.rbegin()->first;
5054 } while (max_read);
5055
5056 ::encode(data, *out);
5057
5058 return 0;
5059}
5060
5061/**
5062 * Returns the trash spec entry of an image registered in the rbd_trash
5063 * object.
5064 *
5065 * Input:
5066 * @param id the id of the image
5067 *
5068 * Output:
5069 * @param out the trash spec entry
5070 *
5071 * @returns 0 on success, negative error code on failure
5072 */
5073int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5074{
5075 string id;
5076 try {
5077 bufferlist::iterator iter = in->begin();
5078 ::decode(id, iter);
5079 } catch (const buffer::error &err) {
5080 return -EINVAL;
5081 }
5082
5083 CLS_LOG(20, "trash_get_image id=%s", id.c_str());
5084
5085
5086 string key = trash::image_key(id);
5087 bufferlist bl;
5088 int r = cls_cxx_map_get_val(hctx, key, out);
5089 if (r != -ENOENT) {
5090 CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
5091 cpp_strerror(r).c_str());
5092 }
5093 return r;
5094}
5095
5096CLS_INIT(rbd)
5097{
5098 CLS_LOG(20, "Loaded rbd class!");
5099
5100 cls_handle_t h_class;
5101 cls_method_handle_t h_create;
5102 cls_method_handle_t h_get_features;
5103 cls_method_handle_t h_set_features;
5104 cls_method_handle_t h_get_size;
5105 cls_method_handle_t h_set_size;
5106 cls_method_handle_t h_get_parent;
5107 cls_method_handle_t h_set_parent;
5108 cls_method_handle_t h_get_protection_status;
5109 cls_method_handle_t h_set_protection_status;
5110 cls_method_handle_t h_get_stripe_unit_count;
5111 cls_method_handle_t h_set_stripe_unit_count;
31f18b77 5112 cls_method_handle_t h_get_create_timestamp;
7c673cae
FG
5113 cls_method_handle_t h_get_flags;
5114 cls_method_handle_t h_set_flags;
5115 cls_method_handle_t h_remove_parent;
5116 cls_method_handle_t h_add_child;
5117 cls_method_handle_t h_remove_child;
5118 cls_method_handle_t h_get_children;
5119 cls_method_handle_t h_get_snapcontext;
5120 cls_method_handle_t h_get_object_prefix;
5121 cls_method_handle_t h_get_data_pool;
5122 cls_method_handle_t h_get_snapshot_name;
5123 cls_method_handle_t h_get_snapshot_namespace;
5124 cls_method_handle_t h_get_snapshot_timestamp;
5125 cls_method_handle_t h_snapshot_add;
5126 cls_method_handle_t h_snapshot_remove;
5127 cls_method_handle_t h_snapshot_rename;
5128 cls_method_handle_t h_get_all_features;
5129 cls_method_handle_t h_copyup;
5130 cls_method_handle_t h_get_id;
5131 cls_method_handle_t h_set_id;
5132 cls_method_handle_t h_dir_get_id;
5133 cls_method_handle_t h_dir_get_name;
5134 cls_method_handle_t h_dir_list;
5135 cls_method_handle_t h_dir_add_image;
5136 cls_method_handle_t h_dir_remove_image;
5137 cls_method_handle_t h_dir_rename_image;
5138 cls_method_handle_t h_object_map_load;
5139 cls_method_handle_t h_object_map_save;
5140 cls_method_handle_t h_object_map_resize;
5141 cls_method_handle_t h_object_map_update;
5142 cls_method_handle_t h_object_map_snap_add;
5143 cls_method_handle_t h_object_map_snap_remove;
5144 cls_method_handle_t h_metadata_set;
5145 cls_method_handle_t h_metadata_remove;
5146 cls_method_handle_t h_metadata_list;
5147 cls_method_handle_t h_metadata_get;
5148 cls_method_handle_t h_snapshot_get_limit;
5149 cls_method_handle_t h_snapshot_set_limit;
5150 cls_method_handle_t h_old_snapshots_list;
5151 cls_method_handle_t h_old_snapshot_add;
5152 cls_method_handle_t h_old_snapshot_remove;
5153 cls_method_handle_t h_old_snapshot_rename;
5154 cls_method_handle_t h_mirror_uuid_get;
5155 cls_method_handle_t h_mirror_uuid_set;
5156 cls_method_handle_t h_mirror_mode_get;
5157 cls_method_handle_t h_mirror_mode_set;
5158 cls_method_handle_t h_mirror_peer_list;
5159 cls_method_handle_t h_mirror_peer_add;
5160 cls_method_handle_t h_mirror_peer_remove;
5161 cls_method_handle_t h_mirror_peer_set_client;
5162 cls_method_handle_t h_mirror_peer_set_cluster;
5163 cls_method_handle_t h_mirror_image_list;
5164 cls_method_handle_t h_mirror_image_get_image_id;
5165 cls_method_handle_t h_mirror_image_get;
5166 cls_method_handle_t h_mirror_image_set;
5167 cls_method_handle_t h_mirror_image_remove;
5168 cls_method_handle_t h_mirror_image_status_set;
5169 cls_method_handle_t h_mirror_image_status_remove;
5170 cls_method_handle_t h_mirror_image_status_get;
5171 cls_method_handle_t h_mirror_image_status_list;
5172 cls_method_handle_t h_mirror_image_status_get_summary;
5173 cls_method_handle_t h_mirror_image_status_remove_down;
5174 cls_method_handle_t h_mirror_instances_list;
5175 cls_method_handle_t h_mirror_instances_add;
5176 cls_method_handle_t h_mirror_instances_remove;
5177 cls_method_handle_t h_group_create;
5178 cls_method_handle_t h_group_dir_list;
5179 cls_method_handle_t h_group_dir_add;
5180 cls_method_handle_t h_group_dir_remove;
5181 cls_method_handle_t h_group_image_remove;
5182 cls_method_handle_t h_group_image_list;
5183 cls_method_handle_t h_group_image_set;
5184 cls_method_handle_t h_image_add_group;
5185 cls_method_handle_t h_image_remove_group;
5186 cls_method_handle_t h_image_get_group;
5187 cls_method_handle_t h_trash_add;
5188 cls_method_handle_t h_trash_remove;
5189 cls_method_handle_t h_trash_list;
5190 cls_method_handle_t h_trash_get;
5191
5192 cls_register("rbd", &h_class);
5193 cls_register_cxx_method(h_class, "create",
5194 CLS_METHOD_RD | CLS_METHOD_WR,
5195 create, &h_create);
5196 cls_register_cxx_method(h_class, "get_features",
5197 CLS_METHOD_RD,
5198 get_features, &h_get_features);
5199 cls_register_cxx_method(h_class, "set_features",
5200 CLS_METHOD_RD | CLS_METHOD_WR,
5201 set_features, &h_set_features);
5202 cls_register_cxx_method(h_class, "get_size",
5203 CLS_METHOD_RD,
5204 get_size, &h_get_size);
5205 cls_register_cxx_method(h_class, "set_size",
5206 CLS_METHOD_RD | CLS_METHOD_WR,
5207 set_size, &h_set_size);
5208 cls_register_cxx_method(h_class, "get_snapcontext",
5209 CLS_METHOD_RD,
5210 get_snapcontext, &h_get_snapcontext);
5211 cls_register_cxx_method(h_class, "get_object_prefix",
5212 CLS_METHOD_RD,
5213 get_object_prefix, &h_get_object_prefix);
5214 cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
5215 get_data_pool, &h_get_data_pool);
5216 cls_register_cxx_method(h_class, "get_snapshot_name",
5217 CLS_METHOD_RD,
5218 get_snapshot_name, &h_get_snapshot_name);
5219 cls_register_cxx_method(h_class, "get_snapshot_namespace",
5220 CLS_METHOD_RD,
5221 get_snapshot_namespace, &h_get_snapshot_namespace);
5222 cls_register_cxx_method(h_class, "get_snapshot_timestamp",
5223 CLS_METHOD_RD,
5224 get_snapshot_timestamp, &h_get_snapshot_timestamp);
5225 cls_register_cxx_method(h_class, "snapshot_add",
5226 CLS_METHOD_RD | CLS_METHOD_WR,
5227 snapshot_add, &h_snapshot_add);
5228 cls_register_cxx_method(h_class, "snapshot_remove",
5229 CLS_METHOD_RD | CLS_METHOD_WR,
5230 snapshot_remove, &h_snapshot_remove);
5231 cls_register_cxx_method(h_class, "snapshot_rename",
5232 CLS_METHOD_RD | CLS_METHOD_WR,
5233 snapshot_rename, &h_snapshot_rename);
5234 cls_register_cxx_method(h_class, "get_all_features",
5235 CLS_METHOD_RD,
5236 get_all_features, &h_get_all_features);
5237 cls_register_cxx_method(h_class, "copyup",
5238 CLS_METHOD_RD | CLS_METHOD_WR,
5239 copyup, &h_copyup);
5240 cls_register_cxx_method(h_class, "get_parent",
5241 CLS_METHOD_RD,
5242 get_parent, &h_get_parent);
5243 cls_register_cxx_method(h_class, "set_parent",
5244 CLS_METHOD_RD | CLS_METHOD_WR,
5245 set_parent, &h_set_parent);
5246 cls_register_cxx_method(h_class, "remove_parent",
5247 CLS_METHOD_RD | CLS_METHOD_WR,
5248 remove_parent, &h_remove_parent);
5249 cls_register_cxx_method(h_class, "set_protection_status",
5250 CLS_METHOD_RD | CLS_METHOD_WR,
5251 set_protection_status, &h_set_protection_status);
5252 cls_register_cxx_method(h_class, "get_protection_status",
5253 CLS_METHOD_RD,
5254 get_protection_status, &h_get_protection_status);
5255 cls_register_cxx_method(h_class, "get_stripe_unit_count",
5256 CLS_METHOD_RD,
5257 get_stripe_unit_count, &h_get_stripe_unit_count);
5258 cls_register_cxx_method(h_class, "set_stripe_unit_count",
5259 CLS_METHOD_RD | CLS_METHOD_WR,
5260 set_stripe_unit_count, &h_set_stripe_unit_count);
31f18b77
FG
5261 cls_register_cxx_method(h_class, "get_create_timestamp",
5262 CLS_METHOD_RD,
5263 get_create_timestamp, &h_get_create_timestamp);
7c673cae
FG
5264 cls_register_cxx_method(h_class, "get_flags",
5265 CLS_METHOD_RD,
5266 get_flags, &h_get_flags);
5267 cls_register_cxx_method(h_class, "set_flags",
5268 CLS_METHOD_RD | CLS_METHOD_WR,
5269 set_flags, &h_set_flags);
5270 cls_register_cxx_method(h_class, "metadata_list",
5271 CLS_METHOD_RD,
5272 metadata_list, &h_metadata_list);
5273 cls_register_cxx_method(h_class, "metadata_set",
5274 CLS_METHOD_RD | CLS_METHOD_WR,
5275 metadata_set, &h_metadata_set);
5276 cls_register_cxx_method(h_class, "metadata_remove",
5277 CLS_METHOD_RD | CLS_METHOD_WR,
5278 metadata_remove, &h_metadata_remove);
5279 cls_register_cxx_method(h_class, "metadata_get",
5280 CLS_METHOD_RD,
5281 metadata_get, &h_metadata_get);
5282 cls_register_cxx_method(h_class, "snapshot_get_limit",
5283 CLS_METHOD_RD,
5284 snapshot_get_limit, &h_snapshot_get_limit);
5285 cls_register_cxx_method(h_class, "snapshot_set_limit",
5286 CLS_METHOD_WR,
5287 snapshot_set_limit, &h_snapshot_set_limit);
5288
5289 /* methods for the rbd_children object */
5290 cls_register_cxx_method(h_class, "add_child",
5291 CLS_METHOD_RD | CLS_METHOD_WR,
5292 add_child, &h_add_child);
5293 cls_register_cxx_method(h_class, "remove_child",
5294 CLS_METHOD_RD | CLS_METHOD_WR,
5295 remove_child, &h_remove_child);
5296 cls_register_cxx_method(h_class, "get_children",
5297 CLS_METHOD_RD,
5298 get_children, &h_get_children);
5299
5300 /* methods for the rbd_id.$image_name objects */
5301 cls_register_cxx_method(h_class, "get_id",
5302 CLS_METHOD_RD,
5303 get_id, &h_get_id);
5304 cls_register_cxx_method(h_class, "set_id",
5305 CLS_METHOD_RD | CLS_METHOD_WR,
5306 set_id, &h_set_id);
5307
5308 /* methods for the rbd_directory object */
5309 cls_register_cxx_method(h_class, "dir_get_id",
5310 CLS_METHOD_RD,
5311 dir_get_id, &h_dir_get_id);
5312 cls_register_cxx_method(h_class, "dir_get_name",
5313 CLS_METHOD_RD,
5314 dir_get_name, &h_dir_get_name);
5315 cls_register_cxx_method(h_class, "dir_list",
5316 CLS_METHOD_RD,
5317 dir_list, &h_dir_list);
5318 cls_register_cxx_method(h_class, "dir_add_image",
5319 CLS_METHOD_RD | CLS_METHOD_WR,
5320 dir_add_image, &h_dir_add_image);
5321 cls_register_cxx_method(h_class, "dir_remove_image",
5322 CLS_METHOD_RD | CLS_METHOD_WR,
5323 dir_remove_image, &h_dir_remove_image);
5324 cls_register_cxx_method(h_class, "dir_rename_image",
5325 CLS_METHOD_RD | CLS_METHOD_WR,
5326 dir_rename_image, &h_dir_rename_image);
5327
5328 /* methods for the rbd_object_map.$image_id object */
5329 cls_register_cxx_method(h_class, "object_map_load",
5330 CLS_METHOD_RD,
5331 object_map_load, &h_object_map_load);
5332 cls_register_cxx_method(h_class, "object_map_save",
5333 CLS_METHOD_RD | CLS_METHOD_WR,
5334 object_map_save, &h_object_map_save);
5335 cls_register_cxx_method(h_class, "object_map_resize",
5336 CLS_METHOD_RD | CLS_METHOD_WR,
5337 object_map_resize, &h_object_map_resize);
5338 cls_register_cxx_method(h_class, "object_map_update",
5339 CLS_METHOD_RD | CLS_METHOD_WR,
5340 object_map_update, &h_object_map_update);
5341 cls_register_cxx_method(h_class, "object_map_snap_add",
5342 CLS_METHOD_RD | CLS_METHOD_WR,
5343 object_map_snap_add, &h_object_map_snap_add);
5344 cls_register_cxx_method(h_class, "object_map_snap_remove",
5345 CLS_METHOD_RD | CLS_METHOD_WR,
5346 object_map_snap_remove, &h_object_map_snap_remove);
5347
5348 /* methods for the old format */
5349 cls_register_cxx_method(h_class, "snap_list",
5350 CLS_METHOD_RD,
5351 old_snapshots_list, &h_old_snapshots_list);
5352 cls_register_cxx_method(h_class, "snap_add",
5353 CLS_METHOD_RD | CLS_METHOD_WR,
5354 old_snapshot_add, &h_old_snapshot_add);
5355 cls_register_cxx_method(h_class, "snap_remove",
5356 CLS_METHOD_RD | CLS_METHOD_WR,
5357 old_snapshot_remove, &h_old_snapshot_remove);
5358 cls_register_cxx_method(h_class, "snap_rename",
5359 CLS_METHOD_RD | CLS_METHOD_WR,
5360 old_snapshot_rename, &h_old_snapshot_rename);
5361
5362 /* methods for the rbd_mirroring object */
5363 cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
5364 mirror_uuid_get, &h_mirror_uuid_get);
5365 cls_register_cxx_method(h_class, "mirror_uuid_set",
5366 CLS_METHOD_RD | CLS_METHOD_WR,
5367 mirror_uuid_set, &h_mirror_uuid_set);
5368 cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
5369 mirror_mode_get, &h_mirror_mode_get);
5370 cls_register_cxx_method(h_class, "mirror_mode_set",
5371 CLS_METHOD_RD | CLS_METHOD_WR,
5372 mirror_mode_set, &h_mirror_mode_set);
5373 cls_register_cxx_method(h_class, "mirror_peer_list", CLS_METHOD_RD,
5374 mirror_peer_list, &h_mirror_peer_list);
5375 cls_register_cxx_method(h_class, "mirror_peer_add",
5376 CLS_METHOD_RD | CLS_METHOD_WR,
5377 mirror_peer_add, &h_mirror_peer_add);
5378 cls_register_cxx_method(h_class, "mirror_peer_remove",
5379 CLS_METHOD_RD | CLS_METHOD_WR,
5380 mirror_peer_remove, &h_mirror_peer_remove);
5381 cls_register_cxx_method(h_class, "mirror_peer_set_client",
5382 CLS_METHOD_RD | CLS_METHOD_WR,
5383 mirror_peer_set_client, &h_mirror_peer_set_client);
5384 cls_register_cxx_method(h_class, "mirror_peer_set_cluster",
5385 CLS_METHOD_RD | CLS_METHOD_WR,
5386 mirror_peer_set_cluster, &h_mirror_peer_set_cluster);
5387 cls_register_cxx_method(h_class, "mirror_image_list", CLS_METHOD_RD,
5388 mirror_image_list, &h_mirror_image_list);
5389 cls_register_cxx_method(h_class, "mirror_image_get_image_id", CLS_METHOD_RD,
5390 mirror_image_get_image_id,
5391 &h_mirror_image_get_image_id);
5392 cls_register_cxx_method(h_class, "mirror_image_get", CLS_METHOD_RD,
5393 mirror_image_get, &h_mirror_image_get);
5394 cls_register_cxx_method(h_class, "mirror_image_set",
5395 CLS_METHOD_RD | CLS_METHOD_WR,
5396 mirror_image_set, &h_mirror_image_set);
5397 cls_register_cxx_method(h_class, "mirror_image_remove",
5398 CLS_METHOD_RD | CLS_METHOD_WR,
5399 mirror_image_remove, &h_mirror_image_remove);
5400 cls_register_cxx_method(h_class, "mirror_image_status_set",
5401 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5402 mirror_image_status_set, &h_mirror_image_status_set);
5403 cls_register_cxx_method(h_class, "mirror_image_status_remove",
5404 CLS_METHOD_RD | CLS_METHOD_WR,
5405 mirror_image_status_remove,
5406 &h_mirror_image_status_remove);
5407 cls_register_cxx_method(h_class, "mirror_image_status_get", CLS_METHOD_RD,
5408 mirror_image_status_get, &h_mirror_image_status_get);
5409 cls_register_cxx_method(h_class, "mirror_image_status_list", CLS_METHOD_RD,
5410 mirror_image_status_list,
5411 &h_mirror_image_status_list);
5412 cls_register_cxx_method(h_class, "mirror_image_status_get_summary",
5413 CLS_METHOD_RD, mirror_image_status_get_summary,
5414 &h_mirror_image_status_get_summary);
5415 cls_register_cxx_method(h_class, "mirror_image_status_remove_down",
5416 CLS_METHOD_RD | CLS_METHOD_WR,
5417 mirror_image_status_remove_down,
5418 &h_mirror_image_status_remove_down);
5419 cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
5420 mirror_instances_list, &h_mirror_instances_list);
5421 cls_register_cxx_method(h_class, "mirror_instances_add",
5422 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5423 mirror_instances_add, &h_mirror_instances_add);
5424 cls_register_cxx_method(h_class, "mirror_instances_remove",
5425 CLS_METHOD_RD | CLS_METHOD_WR,
5426 mirror_instances_remove,
5427 &h_mirror_instances_remove);
5428 /* methods for the consistency groups feature */
5429 cls_register_cxx_method(h_class, "group_create",
5430 CLS_METHOD_RD | CLS_METHOD_WR,
5431 group_create, &h_group_create);
5432 cls_register_cxx_method(h_class, "group_dir_list",
5433 CLS_METHOD_RD,
5434 group_dir_list, &h_group_dir_list);
5435 cls_register_cxx_method(h_class, "group_dir_add",
5436 CLS_METHOD_RD | CLS_METHOD_WR,
5437 group_dir_add, &h_group_dir_add);
5438 cls_register_cxx_method(h_class, "group_dir_remove",
5439 CLS_METHOD_RD | CLS_METHOD_WR,
5440 group_dir_remove, &h_group_dir_remove);
5441 cls_register_cxx_method(h_class, "group_image_remove",
5442 CLS_METHOD_RD | CLS_METHOD_WR,
5443 group_image_remove, &h_group_image_remove);
5444 cls_register_cxx_method(h_class, "group_image_list",
5445 CLS_METHOD_RD | CLS_METHOD_WR,
5446 group_image_list, &h_group_image_list);
5447 cls_register_cxx_method(h_class, "group_image_set",
5448 CLS_METHOD_RD | CLS_METHOD_WR,
5449 group_image_set, &h_group_image_set);
5450 cls_register_cxx_method(h_class, "image_add_group",
5451 CLS_METHOD_RD | CLS_METHOD_WR,
5452 image_add_group, &h_image_add_group);
5453 cls_register_cxx_method(h_class, "image_remove_group",
5454 CLS_METHOD_RD | CLS_METHOD_WR,
5455 image_remove_group, &h_image_remove_group);
5456 cls_register_cxx_method(h_class, "image_get_group",
5457 CLS_METHOD_RD,
5458 image_get_group, &h_image_get_group);
5459
5460 /* rbd_trash object methods */
5461 cls_register_cxx_method(h_class, "trash_add",
5462 CLS_METHOD_RD | CLS_METHOD_WR,
5463 trash_add, &h_trash_add);
5464 cls_register_cxx_method(h_class, "trash_remove",
5465 CLS_METHOD_RD | CLS_METHOD_WR,
5466 trash_remove, &h_trash_remove);
5467 cls_register_cxx_method(h_class, "trash_list",
5468 CLS_METHOD_RD,
5469 trash_list, &h_trash_list);
5470 cls_register_cxx_method(h_class, "trash_get",
5471 CLS_METHOD_RD,
5472 trash_get, &h_trash_get);
5473
5474 return;
5475}