]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/api/Mirror.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / librbd / api / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/api/Mirror.h"
5 #include "include/rados/librados.hpp"
6 #include "include/stringify.h"
7 #include "common/ceph_json.h"
8 #include "common/dout.h"
9 #include "common/errno.h"
10 #include "cls/rbd/cls_rbd_client.h"
11 #include "librbd/ExclusiveLock.h"
12 #include "librbd/ImageCtx.h"
13 #include "librbd/ImageState.h"
14 #include "librbd/Journal.h"
15 #include "librbd/Utils.h"
16 #include "librbd/api/Image.h"
17 #include "librbd/mirror/DemoteRequest.h"
18 #include "librbd/mirror/DisableRequest.h"
19 #include "librbd/mirror/EnableRequest.h"
20 #include "librbd/mirror/GetInfoRequest.h"
21 #include "librbd/mirror/GetStatusRequest.h"
22 #include "librbd/mirror/PromoteRequest.h"
23 #include "librbd/mirror/Types.h"
24 #include "librbd/MirroringWatcher.h"
25 #include <boost/algorithm/string/trim.hpp>
26 #include <boost/algorithm/string/replace.hpp>
27 #include <boost/scope_exit.hpp>
28
29 #define dout_subsys ceph_subsys_rbd
30 #undef dout_prefix
31 #define dout_prefix *_dout << "librbd::api::Mirror: " << __func__ << ": "
32
33 namespace librbd {
34 namespace api {
35
36 namespace {
37
38 int get_config_key(librados::Rados& rados, const std::string& key,
39 std::string* value) {
40 std::string cmd =
41 "{"
42 "\"prefix\": \"config-key get\", "
43 "\"key\": \"" + key + "\""
44 "}";
45
46 bufferlist in_bl;
47 bufferlist out_bl;
48
49 int r = rados.mon_command(cmd, in_bl, &out_bl, nullptr);
50 if (r == -EINVAL) {
51 return -EOPNOTSUPP;
52 } else if (r < 0 && r != -ENOENT) {
53 return r;
54 }
55
56 *value = out_bl.to_str();
57 return 0;
58 }
59
60 int set_config_key(librados::Rados& rados, const std::string& key,
61 const std::string& value) {
62 std::string cmd;
63 if (value.empty()) {
64 cmd = "{"
65 "\"prefix\": \"config-key rm\", "
66 "\"key\": \"" + key + "\""
67 "}";
68 } else {
69 cmd = "{"
70 "\"prefix\": \"config-key set\", "
71 "\"key\": \"" + key + "\", "
72 "\"val\": \"" + value + "\""
73 "}";
74 }
75 bufferlist in_bl;
76 bufferlist out_bl;
77
78 int r = rados.mon_command(cmd, in_bl, &out_bl, nullptr);
79 if (r == -EINVAL) {
80 return -EOPNOTSUPP;
81 } else if (r < 0) {
82 return r;
83 }
84
85 return 0;
86 }
87
88 std::string get_peer_config_key_name(int64_t pool_id,
89 const std::string& peer_uuid) {
90 return RBD_MIRROR_PEER_CONFIG_KEY_PREFIX + stringify(pool_id) + "/" +
91 peer_uuid;
92 }
93
94 int remove_peer_config_key(librados::IoCtx& io_ctx,
95 const std::string& peer_uuid) {
96 int64_t pool_id = io_ctx.get_id();
97 auto key = get_peer_config_key_name(pool_id, peer_uuid);
98
99 librados::Rados rados(io_ctx);
100 int r = set_config_key(rados, key, "");
101 if (r < 0 && r != -ENOENT && r != -EPERM) {
102 return r;
103 }
104 return 0;
105 }
106
107 int create_bootstrap_user(CephContext* cct, librados::Rados& rados,
108 std::string* peer_client_id, std::string* cephx_key) {
109 ldout(cct, 20) << dendl;
110
111 // retrieve peer CephX user from config-key
112 int r = get_config_key(rados, RBD_MIRROR_PEER_CLIENT_ID_CONFIG_KEY,
113 peer_client_id);
114 if (r == -EACCES) {
115 ldout(cct, 5) << "insufficient permissions to get peer-client-id "
116 << "config-key" << dendl;
117 return r;
118 } else if (r < 0 && r != -ENOENT) {
119 lderr(cct) << "failed to retrieve peer client id key: "
120 << cpp_strerror(r) << dendl;
121 return r;
122 } else if (r == -ENOENT || peer_client_id->empty()) {
123 ldout(cct, 20) << "creating new peer-client-id config-key" << dendl;
124
125 *peer_client_id = "rbd-mirror-peer";
126 r = set_config_key(rados, RBD_MIRROR_PEER_CLIENT_ID_CONFIG_KEY,
127 *peer_client_id);
128 if (r == -EACCES) {
129 ldout(cct, 5) << "insufficient permissions to update peer-client-id "
130 << "config-key" << dendl;
131 return r;
132 } else if (r < 0) {
133 lderr(cct) << "failed to update peer client id key: "
134 << cpp_strerror(r) << dendl;
135 return r;
136 }
137 }
138 ldout(cct, 20) << "peer_client_id=" << *peer_client_id << dendl;
139
140 // create peer client user
141 std::string cmd =
142 R"({)" \
143 R"( "prefix": "auth get-or-create",)" \
144 R"( "entity": "client.)" + *peer_client_id + R"(",)" \
145 R"( "caps": [)" \
146 R"( "mon", "profile rbd-mirror-peer",)" \
147 R"( "osd", "profile rbd"],)" \
148 R"( "format": "json")" \
149 R"(})";
150
151 bufferlist in_bl;
152 bufferlist out_bl;
153
154 r = rados.mon_command(cmd, in_bl, &out_bl, nullptr);
155 if (r == -EINVAL) {
156 ldout(cct, 5) << "caps mismatch for existing user" << dendl;
157 return -EEXIST;
158 } else if (r == -EACCES) {
159 ldout(cct, 5) << "insufficient permissions to create user" << dendl;
160 return r;
161 } else if (r < 0) {
162 lderr(cct) << "failed to create or update RBD mirroring bootstrap user: "
163 << cpp_strerror(r) << dendl;
164 return r;
165 }
166
167 // extract key from response
168 bool json_valid = false;
169 json_spirit::mValue json_root;
170 if(json_spirit::read(out_bl.to_str(), json_root)) {
171 try {
172 auto& json_obj = json_root.get_array()[0].get_obj();
173 *cephx_key = json_obj["key"].get_str();
174 json_valid = true;
175 } catch (std::runtime_error&) {
176 }
177 }
178
179 if (!json_valid) {
180 lderr(cct) << "invalid auth keyring JSON received" << dendl;
181 return -EBADMSG;
182 }
183
184 return 0;
185 }
186
187 int create_bootstrap_peer(CephContext* cct, librados::IoCtx& io_ctx,
188 const std::string& site_name, const std::string& fsid,
189 const std::string& client_id, const std::string& key,
190 const std::string& mon_host,
191 const std::string& cluster1,
192 const std::string& cluster2) {
193 ldout(cct, 20) << dendl;
194
195 std::string peer_uuid;
196 std::vector<mirror_peer_t> peers;
197 int r = Mirror<>::peer_list(io_ctx, &peers);
198 if (r < 0 && r != -ENOENT) {
199 lderr(cct) << "failed to list mirror peers: " << cpp_strerror(r) << dendl;
200 return r;
201 }
202
203 if (peers.empty()) {
204 r = Mirror<>::peer_add(io_ctx, &peer_uuid, site_name,
205 "client." + client_id);
206 if (r < 0) {
207 lderr(cct) << "failed to add " << cluster1 << " peer to "
208 << cluster2 << " " << "cluster: " << cpp_strerror(r) << dendl;
209 return r;
210 }
211 } else if (peers[0].cluster_name != site_name &&
212 peers[0].cluster_name != fsid) {
213 // only support a single peer
214 lderr(cct) << "multiple peers are not currently supported" << dendl;
215 return -EINVAL;
216 } else {
217 peer_uuid = peers[0].uuid;
218
219 if (peers[0].cluster_name != site_name) {
220 r = Mirror<>::peer_set_cluster(io_ctx, peer_uuid, site_name);
221 if (r < 0) {
222 // non-fatal attempt to update site name
223 lderr(cct) << "failed to update peer site name" << dendl;
224 }
225 }
226 }
227
228 Mirror<>::Attributes attributes {
229 {"mon_host", mon_host},
230 {"key", key}};
231 r = Mirror<>::peer_set_attributes(io_ctx, peer_uuid, attributes);
232 if (r < 0) {
233 lderr(cct) << "failed to update " << cluster1 << " cluster connection "
234 << "attributes in " << cluster2 << " cluster: "
235 << cpp_strerror(r) << dendl;
236 return r;
237 }
238
239 return 0;
240 }
241
242 template <typename I>
243 int validate_mirroring_enabled(I *ictx) {
244 CephContext *cct = ictx->cct;
245 cls::rbd::MirrorImage mirror_image_internal;
246 int r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id,
247 &mirror_image_internal);
248 if (r < 0 && r != -ENOENT) {
249 lderr(cct) << "failed to retrieve mirroring state: " << cpp_strerror(r)
250 << dendl;
251 return r;
252 } else if (mirror_image_internal.state !=
253 cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
254 lderr(cct) << "mirroring is not currently enabled" << dendl;
255 return -EINVAL;
256 }
257 return 0;
258 }
259
260 int list_mirror_images(librados::IoCtx& io_ctx,
261 std::set<std::string>& mirror_image_ids) {
262 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
263
264 std::string last_read = "";
265 int max_read = 1024;
266 int r;
267 do {
268 std::map<std::string, std::string> mirror_images;
269 r = cls_client::mirror_image_list(&io_ctx, last_read, max_read,
270 &mirror_images);
271 if (r < 0 && r != -ENOENT) {
272 lderr(cct) << "error listing mirrored image directory: "
273 << cpp_strerror(r) << dendl;
274 return r;
275 }
276 for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
277 mirror_image_ids.insert(it->first);
278 }
279 if (!mirror_images.empty()) {
280 last_read = mirror_images.rbegin()->first;
281 }
282 r = mirror_images.size();
283 } while (r == max_read);
284
285 return 0;
286 }
287
288 struct C_ImageGetInfo : public Context {
289 mirror_image_info_t *mirror_image_info;
290 Context *on_finish;
291
292 cls::rbd::MirrorImage mirror_image;
293 mirror::PromotionState promotion_state = mirror::PROMOTION_STATE_PRIMARY;
294
295 C_ImageGetInfo(mirror_image_info_t *mirror_image_info, Context *on_finish)
296 : mirror_image_info(mirror_image_info), on_finish(on_finish) {
297 }
298
299 void finish(int r) override {
300 if (r < 0) {
301 on_finish->complete(r);
302 return;
303 }
304
305 mirror_image_info->global_id = mirror_image.global_image_id;
306 mirror_image_info->state = static_cast<rbd_mirror_image_state_t>(
307 mirror_image.state);
308 mirror_image_info->primary = (
309 promotion_state == mirror::PROMOTION_STATE_PRIMARY);
310 on_finish->complete(0);
311 }
312 };
313
314 struct C_ImageGetStatus : public C_ImageGetInfo {
315 std::string image_name;
316 mirror_image_status_t *mirror_image_status;
317
318 cls::rbd::MirrorImageStatus mirror_image_status_internal;
319
320 C_ImageGetStatus(const std::string &image_name,
321 mirror_image_status_t *mirror_image_status,
322 Context *on_finish)
323 : C_ImageGetInfo(&mirror_image_status->info, on_finish),
324 image_name(image_name), mirror_image_status(mirror_image_status) {
325 }
326
327 void finish(int r) override {
328 if (r < 0) {
329 on_finish->complete(r);
330 return;
331 }
332
333 mirror_image_status->name = image_name;
334 mirror_image_status->state = static_cast<mirror_image_status_state_t>(
335 mirror_image_status_internal.state);
336 mirror_image_status->description = mirror_image_status_internal.description;
337 mirror_image_status->last_update =
338 mirror_image_status_internal.last_update.sec();
339 mirror_image_status->up = mirror_image_status_internal.up;
340 C_ImageGetInfo::finish(0);
341 }
342 };
343
344 } // anonymous namespace
345
346 template <typename I>
347 int Mirror<I>::image_enable(I *ictx, bool relax_same_pool_parent_check) {
348 CephContext *cct = ictx->cct;
349 ldout(cct, 20) << "ictx=" << ictx << dendl;
350
351 // TODO
352 if (!ictx->md_ctx.get_namespace().empty()) {
353 lderr(cct) << "namespaces are not supported" << dendl;
354 return -EINVAL;
355 }
356
357 int r = ictx->state->refresh_if_required();
358 if (r < 0) {
359 return r;
360 }
361
362 cls::rbd::MirrorMode mirror_mode;
363 r = cls_client::mirror_mode_get(&ictx->md_ctx, &mirror_mode);
364 if (r < 0) {
365 lderr(cct) << "cannot enable mirroring: failed to retrieve mirror mode: "
366 << cpp_strerror(r) << dendl;
367 return r;
368 }
369
370 if (mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
371 lderr(cct) << "cannot enable mirroring in the current pool mirroring mode"
372 << dendl;
373 return -EINVAL;
374 }
375
376 // is mirroring not enabled for the parent?
377 {
378 RWLock::RLocker l(ictx->parent_lock);
379 ImageCtx *parent = ictx->parent;
380 if (parent) {
381 if (relax_same_pool_parent_check &&
382 parent->md_ctx.get_id() == ictx->md_ctx.get_id()) {
383 if (!parent->test_features(RBD_FEATURE_JOURNALING)) {
384 lderr(cct) << "journaling is not enabled for the parent" << dendl;
385 return -EINVAL;
386 }
387 } else {
388 cls::rbd::MirrorImage mirror_image_internal;
389 r = cls_client::mirror_image_get(&(parent->md_ctx), parent->id,
390 &mirror_image_internal);
391 if (r == -ENOENT) {
392 lderr(cct) << "mirroring is not enabled for the parent" << dendl;
393 return -EINVAL;
394 }
395 }
396 }
397 }
398
399 if ((ictx->features & RBD_FEATURE_JOURNALING) == 0) {
400 lderr(cct) << "cannot enable mirroring: journaling is not enabled" << dendl;
401 return -EINVAL;
402 }
403
404 C_SaferCond ctx;
405 auto req = mirror::EnableRequest<ImageCtx>::create(ictx, &ctx);
406 req->send();
407
408 r = ctx.wait();
409 if (r < 0) {
410 lderr(cct) << "cannot enable mirroring: " << cpp_strerror(r) << dendl;
411 return r;
412 }
413
414 return 0;
415 }
416
417 template <typename I>
418 int Mirror<I>::image_disable(I *ictx, bool force) {
419 CephContext *cct = ictx->cct;
420 ldout(cct, 20) << "ictx=" << ictx << dendl;
421
422 int r = ictx->state->refresh_if_required();
423 if (r < 0) {
424 return r;
425 }
426
427 cls::rbd::MirrorMode mirror_mode;
428 r = cls_client::mirror_mode_get(&ictx->md_ctx, &mirror_mode);
429 if (r < 0) {
430 lderr(cct) << "cannot disable mirroring: failed to retrieve pool "
431 "mirroring mode: " << cpp_strerror(r) << dendl;
432 return r;
433 }
434
435 if (mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
436 lderr(cct) << "cannot disable mirroring in the current pool mirroring "
437 "mode" << dendl;
438 return -EINVAL;
439 }
440
441 // is mirroring enabled for the child?
442 cls::rbd::MirrorImage mirror_image_internal;
443 r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id,
444 &mirror_image_internal);
445 if (r == -ENOENT) {
446 // mirroring is not enabled for this image
447 ldout(cct, 20) << "ignoring disable command: mirroring is not enabled for "
448 << "this image" << dendl;
449 return 0;
450 } else if (r == -EOPNOTSUPP) {
451 ldout(cct, 5) << "mirroring not supported by OSD" << dendl;
452 return r;
453 } else if (r < 0) {
454 lderr(cct) << "failed to retrieve mirror image metadata: "
455 << cpp_strerror(r) << dendl;
456 return r;
457 }
458
459 mirror_image_internal.state = cls::rbd::MIRROR_IMAGE_STATE_DISABLING;
460 r = cls_client::mirror_image_set(&ictx->md_ctx, ictx->id,
461 mirror_image_internal);
462 if (r < 0) {
463 lderr(cct) << "cannot disable mirroring: " << cpp_strerror(r) << dendl;
464 return r;
465 } else {
466 bool rollback = false;
467 BOOST_SCOPE_EXIT_ALL(ictx, &mirror_image_internal, &rollback) {
468 if (rollback) {
469 CephContext *cct = ictx->cct;
470 mirror_image_internal.state = cls::rbd::MIRROR_IMAGE_STATE_ENABLED;
471 int r = cls_client::mirror_image_set(&ictx->md_ctx, ictx->id,
472 mirror_image_internal);
473 if (r < 0) {
474 lderr(cct) << "failed to re-enable image mirroring: "
475 << cpp_strerror(r) << dendl;
476 }
477 }
478 };
479
480 {
481 RWLock::RLocker l(ictx->snap_lock);
482 map<librados::snap_t, SnapInfo> snap_info = ictx->snap_info;
483 for (auto &info : snap_info) {
484 cls::rbd::ParentImageSpec parent_spec{ictx->md_ctx.get_id(),
485 ictx->md_ctx.get_namespace(),
486 ictx->id, info.first};
487 std::vector<librbd::linked_image_spec_t> child_images;
488 r = Image<I>::list_children(ictx, parent_spec, &child_images);
489 if (r < 0) {
490 rollback = true;
491 return r;
492 }
493
494 if (child_images.empty()) {
495 continue;
496 }
497
498 librados::IoCtx child_io_ctx;
499 int64_t child_pool_id = -1;
500 for (auto &child_image : child_images){
501 std::string pool = child_image.pool_name;
502 if (child_pool_id == -1 ||
503 child_pool_id != child_image.pool_id ||
504 child_io_ctx.get_namespace() != child_image.pool_namespace) {
505 r = util::create_ioctx(ictx->md_ctx, "child image",
506 child_image.pool_id,
507 child_image.pool_namespace,
508 &child_io_ctx);
509 if (r < 0) {
510 rollback = true;
511 return r;
512 }
513
514 child_pool_id = child_image.pool_id;
515 }
516
517 cls::rbd::MirrorImage mirror_image_internal;
518 r = cls_client::mirror_image_get(&child_io_ctx, child_image.image_id,
519 &mirror_image_internal);
520 if (r != -ENOENT) {
521 rollback = true;
522 lderr(cct) << "mirroring is enabled on one or more children "
523 << dendl;
524 return -EBUSY;
525 }
526 }
527 }
528 }
529
530 C_SaferCond ctx;
531 auto req = mirror::DisableRequest<ImageCtx>::create(ictx, force, true,
532 &ctx);
533 req->send();
534
535 r = ctx.wait();
536 if (r < 0) {
537 lderr(cct) << "cannot disable mirroring: " << cpp_strerror(r) << dendl;
538 rollback = true;
539 return r;
540 }
541 }
542
543 return 0;
544 }
545
546 template <typename I>
547 int Mirror<I>::image_promote(I *ictx, bool force) {
548 CephContext *cct = ictx->cct;
549
550 C_SaferCond ctx;
551 Mirror<I>::image_promote(ictx, force, &ctx);
552 int r = ctx.wait();
553 if (r < 0) {
554 lderr(cct) << "failed to promote image" << dendl;
555 return r;
556 }
557
558 return 0;
559 }
560
561 template <typename I>
562 void Mirror<I>::image_promote(I *ictx, bool force, Context *on_finish) {
563 CephContext *cct = ictx->cct;
564 ldout(cct, 20) << "ictx=" << ictx << ", "
565 << "force=" << force << dendl;
566
567 auto req = mirror::PromoteRequest<>::create(*ictx, force, on_finish);
568 req->send();
569 }
570
571 template <typename I>
572 int Mirror<I>::image_demote(I *ictx) {
573 CephContext *cct = ictx->cct;
574
575 C_SaferCond ctx;
576 Mirror<I>::image_demote(ictx, &ctx);
577 int r = ctx.wait();
578 if (r < 0) {
579 lderr(cct) << "failed to demote image" << dendl;
580 return r;
581 }
582
583 return 0;
584 }
585
586 template <typename I>
587 void Mirror<I>::image_demote(I *ictx, Context *on_finish) {
588 CephContext *cct = ictx->cct;
589 ldout(cct, 20) << "ictx=" << ictx << dendl;
590
591 auto req = mirror::DemoteRequest<>::create(*ictx, on_finish);
592 req->send();
593 }
594
595 template <typename I>
596 int Mirror<I>::image_resync(I *ictx) {
597 CephContext *cct = ictx->cct;
598 ldout(cct, 20) << "ictx=" << ictx << dendl;
599
600 int r = ictx->state->refresh_if_required();
601 if (r < 0) {
602 return r;
603 }
604
605 C_SaferCond tag_owner_ctx;
606 bool is_tag_owner;
607 Journal<I>::is_tag_owner(ictx, &is_tag_owner, &tag_owner_ctx);
608 r = tag_owner_ctx.wait();
609 if (r < 0) {
610 lderr(cct) << "failed to determine tag ownership: " << cpp_strerror(r)
611 << dendl;
612 return r;
613 } else if (is_tag_owner) {
614 lderr(cct) << "image is primary, cannot resync to itself" << dendl;
615 return -EINVAL;
616 }
617
618 // flag the journal indicating that we want to rebuild the local image
619 r = Journal<I>::request_resync(ictx);
620 if (r < 0) {
621 lderr(cct) << "failed to request resync: " << cpp_strerror(r) << dendl;
622 return r;
623 }
624
625 return 0;
626 }
627
628 template <typename I>
629 void Mirror<I>::image_get_info(I *ictx, mirror_image_info_t *mirror_image_info,
630 Context *on_finish) {
631 CephContext *cct = ictx->cct;
632 ldout(cct, 20) << "ictx=" << ictx << dendl;
633
634 auto on_refresh = new FunctionContext(
635 [ictx, mirror_image_info, on_finish](int r) {
636 if (r < 0) {
637 lderr(ictx->cct) << "refresh failed: " << cpp_strerror(r) << dendl;
638 on_finish->complete(r);
639 return;
640 }
641
642 auto ctx = new C_ImageGetInfo(mirror_image_info, on_finish);
643 auto req = mirror::GetInfoRequest<I>::create(*ictx, &ctx->mirror_image,
644 &ctx->promotion_state,
645 ctx);
646 req->send();
647 });
648
649 if (ictx->state->is_refresh_required()) {
650 ictx->state->refresh(on_refresh);
651 } else {
652 on_refresh->complete(0);
653 }
654 }
655
656 template <typename I>
657 int Mirror<I>::image_get_info(I *ictx, mirror_image_info_t *mirror_image_info) {
658 C_SaferCond ctx;
659 image_get_info(ictx, mirror_image_info, &ctx);
660
661 int r = ctx.wait();
662 if (r < 0) {
663 return r;
664 }
665 return 0;
666 }
667
668 template <typename I>
669 void Mirror<I>::image_get_status(I *ictx, mirror_image_status_t *status,
670 Context *on_finish) {
671 CephContext *cct = ictx->cct;
672 ldout(cct, 20) << "ictx=" << ictx << dendl;
673
674 auto ctx = new C_ImageGetStatus(ictx->name, status, on_finish);
675 auto req = mirror::GetStatusRequest<I>::create(
676 *ictx, &ctx->mirror_image_status_internal, &ctx->mirror_image,
677 &ctx->promotion_state, ctx);
678 req->send();
679 }
680
681 template <typename I>
682 int Mirror<I>::image_get_status(I *ictx, mirror_image_status_t *status) {
683 C_SaferCond ctx;
684 image_get_status(ictx, status, &ctx);
685
686 int r = ctx.wait();
687 if (r < 0) {
688 return r;
689 }
690 return 0;
691 }
692
693 template <typename I>
694 int Mirror<I>::image_get_instance_id(I *ictx, std::string *instance_id) {
695 CephContext *cct = ictx->cct;
696 ldout(cct, 20) << "ictx=" << ictx << dendl;
697
698 cls::rbd::MirrorImage mirror_image;
699 int r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id, &mirror_image);
700 if (r < 0 && r != -ENOENT) {
701 lderr(cct) << "failed to retrieve mirroring state: " << cpp_strerror(r)
702 << dendl;
703 return r;
704 } else if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
705 lderr(cct) << "mirroring is not currently enabled" << dendl;
706 return -EINVAL;
707 }
708
709 entity_inst_t instance;
710 r = cls_client::mirror_image_instance_get(&ictx->md_ctx,
711 mirror_image.global_image_id,
712 &instance);
713 if (r < 0) {
714 if (r != -ENOENT && r != -ESTALE) {
715 lderr(cct) << "failed to get mirror image instance: " << cpp_strerror(r)
716 << dendl;
717 }
718 return r;
719 }
720
721 *instance_id = stringify(instance.name.num());
722 return 0;
723 }
724
725 template <typename I>
726 int Mirror<I>::site_name_get(librados::Rados& rados, std::string* name) {
727 CephContext *cct = reinterpret_cast<CephContext *>(rados.cct());
728 ldout(cct, 20) << dendl;
729
730 int r = get_config_key(rados, RBD_MIRROR_SITE_NAME_CONFIG_KEY, name);
731 if (r == -EOPNOTSUPP) {
732 return r;
733 } else if (r == -ENOENT || name->empty()) {
734 // default to the cluster fsid
735 r = rados.cluster_fsid(name);
736 if (r < 0) {
737 lderr(cct) << "failed to retrieve cluster fsid: " << cpp_strerror(r)
738 << dendl;
739 }
740 return r;
741 } else if (r < 0) {
742 lderr(cct) << "failed to retrieve site name: " << cpp_strerror(r)
743 << dendl;
744 return r;
745 }
746
747 return 0;
748 }
749
750 template <typename I>
751 int Mirror<I>::site_name_set(librados::Rados& rados, const std::string& name) {
752 CephContext *cct = reinterpret_cast<CephContext *>(rados.cct());
753
754 std::string site_name{name};
755 boost::algorithm::trim(site_name);
756 ldout(cct, 20) << "site_name=" << site_name << dendl;
757
758 int r = set_config_key(rados, RBD_MIRROR_SITE_NAME_CONFIG_KEY, name);
759 if (r == -EOPNOTSUPP) {
760 return r;
761 } else if (r < 0 && r != -ENOENT) {
762 lderr(cct) << "failed to update site name: " << cpp_strerror(r)
763 << dendl;
764 return r;
765 }
766
767 return 0;
768 }
769
770 template <typename I>
771 int Mirror<I>::mode_get(librados::IoCtx& io_ctx,
772 rbd_mirror_mode_t *mirror_mode) {
773 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
774 ldout(cct, 20) << dendl;
775
776 cls::rbd::MirrorMode mirror_mode_internal;
777 int r = cls_client::mirror_mode_get(&io_ctx, &mirror_mode_internal);
778 if (r < 0) {
779 lderr(cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
780 << dendl;
781 return r;
782 }
783
784 switch (mirror_mode_internal) {
785 case cls::rbd::MIRROR_MODE_DISABLED:
786 case cls::rbd::MIRROR_MODE_IMAGE:
787 case cls::rbd::MIRROR_MODE_POOL:
788 *mirror_mode = static_cast<rbd_mirror_mode_t>(mirror_mode_internal);
789 break;
790 default:
791 lderr(cct) << "unknown mirror mode ("
792 << static_cast<uint32_t>(mirror_mode_internal) << ")"
793 << dendl;
794 return -EINVAL;
795 }
796 return 0;
797 }
798
799 template <typename I>
800 int Mirror<I>::mode_set(librados::IoCtx& io_ctx,
801 rbd_mirror_mode_t mirror_mode) {
802 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
803 ldout(cct, 20) << dendl;
804
805 // TODO
806 if (!io_ctx.get_namespace().empty()) {
807 lderr(cct) << "namespaces are not supported" << dendl;
808 return -EINVAL;
809 }
810
811 cls::rbd::MirrorMode next_mirror_mode;
812 switch (mirror_mode) {
813 case RBD_MIRROR_MODE_DISABLED:
814 case RBD_MIRROR_MODE_IMAGE:
815 case RBD_MIRROR_MODE_POOL:
816 next_mirror_mode = static_cast<cls::rbd::MirrorMode>(mirror_mode);
817 break;
818 default:
819 lderr(cct) << "unknown mirror mode ("
820 << static_cast<uint32_t>(mirror_mode) << ")" << dendl;
821 return -EINVAL;
822 }
823
824 int r;
825 if (next_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
826 // fail early if pool still has peers registered and attempting to disable
827 std::vector<cls::rbd::MirrorPeer> mirror_peers;
828 r = cls_client::mirror_peer_list(&io_ctx, &mirror_peers);
829 if (r < 0 && r != -ENOENT) {
830 lderr(cct) << "failed to list peers: " << cpp_strerror(r) << dendl;
831 return r;
832 } else if (!mirror_peers.empty()) {
833 lderr(cct) << "mirror peers still registered" << dendl;
834 return -EBUSY;
835 }
836 }
837
838 cls::rbd::MirrorMode current_mirror_mode;
839 r = cls_client::mirror_mode_get(&io_ctx, &current_mirror_mode);
840 if (r < 0) {
841 lderr(cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
842 << dendl;
843 return r;
844 }
845
846 if (current_mirror_mode == next_mirror_mode) {
847 return 0;
848 } else if (current_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
849 uuid_d uuid_gen;
850 uuid_gen.generate_random();
851 r = cls_client::mirror_uuid_set(&io_ctx, uuid_gen.to_string());
852 if (r < 0) {
853 lderr(cct) << "failed to allocate mirroring uuid: " << cpp_strerror(r)
854 << dendl;
855 return r;
856 }
857 }
858
859 if (current_mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
860 r = cls_client::mirror_mode_set(&io_ctx, cls::rbd::MIRROR_MODE_IMAGE);
861 if (r < 0) {
862 lderr(cct) << "failed to set mirror mode to image: "
863 << cpp_strerror(r) << dendl;
864 return r;
865 }
866
867 r = MirroringWatcher<>::notify_mode_updated(io_ctx,
868 cls::rbd::MIRROR_MODE_IMAGE);
869 if (r < 0) {
870 lderr(cct) << "failed to send update notification: " << cpp_strerror(r)
871 << dendl;
872 }
873 }
874
875 if (next_mirror_mode == cls::rbd::MIRROR_MODE_IMAGE) {
876 return 0;
877 }
878
879 if (next_mirror_mode == cls::rbd::MIRROR_MODE_POOL) {
880 map<string, string> images;
881 r = Image<I>::list_images_v2(io_ctx, &images);
882 if (r < 0) {
883 lderr(cct) << "failed listing images: " << cpp_strerror(r) << dendl;
884 return r;
885 }
886
887 for (const auto& img_pair : images) {
888 uint64_t features;
889 uint64_t incompatible_features;
890 r = cls_client::get_features(&io_ctx, util::header_name(img_pair.second),
891 true, &features, &incompatible_features);
892 if (r < 0) {
893 lderr(cct) << "error getting features for image " << img_pair.first
894 << ": " << cpp_strerror(r) << dendl;
895 return r;
896 }
897
898 if ((features & RBD_FEATURE_JOURNALING) != 0) {
899 I *img_ctx = I::create("", img_pair.second, nullptr, io_ctx, false);
900 r = img_ctx->state->open(0);
901 if (r < 0) {
902 lderr(cct) << "error opening image "<< img_pair.first << ": "
903 << cpp_strerror(r) << dendl;
904 return r;
905 }
906
907 r = image_enable(img_ctx, true);
908 int close_r = img_ctx->state->close();
909 if (r < 0) {
910 lderr(cct) << "error enabling mirroring for image "
911 << img_pair.first << ": " << cpp_strerror(r) << dendl;
912 return r;
913 } else if (close_r < 0) {
914 lderr(cct) << "failed to close image " << img_pair.first << ": "
915 << cpp_strerror(close_r) << dendl;
916 return close_r;
917 }
918 }
919 }
920 } else if (next_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
921 while (true) {
922 bool retry_busy = false;
923 bool pending_busy = false;
924
925 std::set<std::string> image_ids;
926 r = list_mirror_images(io_ctx, image_ids);
927 if (r < 0) {
928 lderr(cct) << "failed listing images: " << cpp_strerror(r) << dendl;
929 return r;
930 }
931
932 for (const auto& img_id : image_ids) {
933 if (current_mirror_mode == cls::rbd::MIRROR_MODE_IMAGE) {
934 cls::rbd::MirrorImage mirror_image;
935 r = cls_client::mirror_image_get(&io_ctx, img_id, &mirror_image);
936 if (r < 0 && r != -ENOENT) {
937 lderr(cct) << "failed to retrieve mirroring state for image id "
938 << img_id << ": " << cpp_strerror(r) << dendl;
939 return r;
940 }
941 if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
942 lderr(cct) << "failed to disable mirror mode: there are still "
943 << "images with mirroring enabled" << dendl;
944 return -EINVAL;
945 }
946 } else {
947 I *img_ctx = I::create("", img_id, nullptr, io_ctx, false);
948 r = img_ctx->state->open(0);
949 if (r < 0) {
950 lderr(cct) << "error opening image id "<< img_id << ": "
951 << cpp_strerror(r) << dendl;
952 return r;
953 }
954
955 r = image_disable(img_ctx, false);
956 int close_r = img_ctx->state->close();
957 if (r == -EBUSY) {
958 pending_busy = true;
959 } else if (r < 0) {
960 lderr(cct) << "error disabling mirroring for image id " << img_id
961 << cpp_strerror(r) << dendl;
962 return r;
963 } else if (close_r < 0) {
964 lderr(cct) << "failed to close image id " << img_id << ": "
965 << cpp_strerror(close_r) << dendl;
966 return close_r;
967 } else if (pending_busy) {
968 // at least one mirrored image was successfully disabled, so we can
969 // retry any failures caused by busy parent/child relationships
970 retry_busy = true;
971 }
972 }
973 }
974
975 if (!retry_busy && pending_busy) {
976 lderr(cct) << "error disabling mirroring for one or more images"
977 << dendl;
978 return -EBUSY;
979 } else if (!retry_busy) {
980 break;
981 }
982 }
983 }
984
985 r = cls_client::mirror_mode_set(&io_ctx, next_mirror_mode);
986 if (r < 0) {
987 lderr(cct) << "failed to set mirror mode: " << cpp_strerror(r) << dendl;
988 return r;
989 }
990
991 r = MirroringWatcher<>::notify_mode_updated(io_ctx, next_mirror_mode);
992 if (r < 0) {
993 lderr(cct) << "failed to send update notification: " << cpp_strerror(r)
994 << dendl;
995 }
996 return 0;
997 }
998
999 template <typename I>
1000 int Mirror<I>::peer_bootstrap_create(librados::IoCtx& io_ctx,
1001 std::string* token) {
1002 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1003 ldout(cct, 20) << dendl;
1004
1005 auto mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
1006 int r = cls_client::mirror_mode_get(&io_ctx, &mirror_mode);
1007 if (r < 0 && r != -ENOENT) {
1008 lderr(cct) << "failed to retrieve mirroring mode: " << cpp_strerror(r)
1009 << dendl;
1010 return r;
1011 } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
1012 return -EINVAL;
1013 }
1014
1015 // retrieve the cluster fsid
1016 std::string fsid;
1017 librados::Rados rados(io_ctx);
1018 r = rados.cluster_fsid(&fsid);
1019 if (r < 0) {
1020 lderr(cct) << "failed to retrieve cluster fsid: " << cpp_strerror(r)
1021 << dendl;
1022 return r;
1023 }
1024
1025 std::string peer_client_id;
1026 std::string cephx_key;
1027 r = create_bootstrap_user(cct, rados, &peer_client_id, &cephx_key);
1028 if (r < 0) {
1029 return r;
1030 }
1031
1032 std::string mon_host = cct->_conf.get_val<std::string>("mon_host");
1033 ldout(cct, 20) << "mon_host=" << mon_host << dendl;
1034
1035 // format the token response
1036 bufferlist token_bl;
1037 token_bl.append(
1038 R"({)" \
1039 R"("fsid":")" + fsid + R"(",)" + \
1040 R"("client_id":")" + peer_client_id + R"(",)" + \
1041 R"("key":")" + cephx_key + R"(",)" + \
1042 R"("mon_host":")" + \
1043 boost::replace_all_copy(mon_host, "\"", "\\\"") + R"(")" + \
1044 R"(})");
1045 ldout(cct, 20) << "token=" << token_bl.to_str() << dendl;
1046
1047 bufferlist base64_bl;
1048 token_bl.encode_base64(base64_bl);
1049 *token = base64_bl.to_str();
1050
1051 return 0;
1052 }
1053
1054 template <typename I>
1055 int Mirror<I>::peer_bootstrap_import(librados::IoCtx& io_ctx,
1056 rbd_mirror_peer_direction_t direction,
1057 const std::string& token) {
1058 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1059 ldout(cct, 20) << dendl;
1060
1061 if (direction != RBD_MIRROR_PEER_DIRECTION_RX &&
1062 direction != RBD_MIRROR_PEER_DIRECTION_RX_TX) {
1063 lderr(cct) << "invalid mirror peer direction" << dendl;
1064 return -EINVAL;
1065 }
1066
1067 bufferlist token_bl;
1068 try {
1069 bufferlist base64_bl;
1070 base64_bl.append(token);
1071 token_bl.decode_base64(base64_bl);
1072 } catch (buffer::error& err) {
1073 lderr(cct) << "failed to decode base64" << dendl;
1074 return -EINVAL;
1075 }
1076
1077 ldout(cct, 20) << "token=" << token_bl.to_str() << dendl;
1078
1079 bool json_valid = false;
1080 std::string expected_remote_fsid;
1081 std::string remote_client_id;
1082 std::string remote_key;
1083 std::string remote_mon_host;
1084
1085 json_spirit::mValue json_root;
1086 if(json_spirit::read(token_bl.to_str(), json_root)) {
1087 try {
1088 auto& json_obj = json_root.get_obj();
1089 expected_remote_fsid = json_obj["fsid"].get_str();
1090 remote_client_id = json_obj["client_id"].get_str();
1091 remote_key = json_obj["key"].get_str();
1092 remote_mon_host = json_obj["mon_host"].get_str();
1093 json_valid = true;
1094 } catch (std::runtime_error&) {
1095 }
1096 }
1097
1098 if (!json_valid) {
1099 lderr(cct) << "invalid bootstrap token JSON received" << dendl;
1100 return -EINVAL;
1101 }
1102
1103 // sanity check import process
1104 std::string local_fsid;
1105 librados::Rados rados(io_ctx);
1106 int r = rados.cluster_fsid(&local_fsid);
1107 if (r < 0) {
1108 lderr(cct) << "failed to retrieve cluster fsid: " << cpp_strerror(r)
1109 << dendl;
1110 return r;
1111 }
1112
1113 std::string local_site_name;
1114 r = site_name_get(rados, &local_site_name);
1115 if (r < 0) {
1116 lderr(cct) << "failed to retrieve cluster site name: " << cpp_strerror(r)
1117 << dendl;
1118 return r;
1119 }
1120
1121 // attempt to connect to remote cluster
1122 librados::Rados remote_rados;
1123 remote_rados.init(remote_client_id.c_str());
1124
1125 auto remote_cct = reinterpret_cast<CephContext*>(remote_rados.cct());
1126 remote_cct->_conf.set_val("mon_host", remote_mon_host);
1127 remote_cct->_conf.set_val("key", remote_key);
1128
1129 r = remote_rados.connect();
1130 if (r < 0) {
1131 lderr(cct) << "failed to connect to peer cluster: " << cpp_strerror(r)
1132 << dendl;
1133 return r;
1134 }
1135
1136 std::string remote_fsid;
1137 r = remote_rados.cluster_fsid(&remote_fsid);
1138 if (r < 0) {
1139 lderr(cct) << "failed to retrieve remote cluster fsid: "
1140 << cpp_strerror(r) << dendl;
1141 return r;
1142 } else if (local_fsid == remote_fsid) {
1143 lderr(cct) << "cannot import token for local cluster" << dendl;
1144 return -EINVAL;
1145 } else if (expected_remote_fsid != remote_fsid) {
1146 lderr(cct) << "unexpected remote cluster fsid" << dendl;
1147 return -EINVAL;
1148 }
1149
1150 std::string remote_site_name;
1151 r = site_name_get(remote_rados, &remote_site_name);
1152 if (r < 0) {
1153 lderr(cct) << "failed to retrieve remote cluster site name: "
1154 << cpp_strerror(r) << dendl;
1155 return r;
1156 } else if (local_site_name == remote_site_name) {
1157 lderr(cct) << "cannot import token for duplicate site name" << dendl;
1158 return -EINVAL;
1159 }
1160
1161 librados::IoCtx remote_io_ctx;
1162 r = remote_rados.ioctx_create(io_ctx.get_pool_name().c_str(), remote_io_ctx);
1163 if (r == -ENOENT) {
1164 ldout(cct, 10) << "remote pool does not exist" << dendl;
1165 return r;
1166 } else if (r < 0) {
1167 lderr(cct) << "failed to open remote pool '" << io_ctx.get_pool_name()
1168 << "': " << cpp_strerror(r) << dendl;
1169 return r;
1170 }
1171
1172 auto remote_mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
1173 r = cls_client::mirror_mode_get(&remote_io_ctx, &remote_mirror_mode);
1174 if (r < 0 && r != -ENOENT) {
1175 lderr(cct) << "failed to retrieve remote mirroring mode: "
1176 << cpp_strerror(r) << dendl;
1177 return r;
1178 } else if (remote_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
1179 return -ENOSYS;
1180 }
1181
1182 auto local_mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
1183 r = cls_client::mirror_mode_get(&io_ctx, &local_mirror_mode);
1184 if (r < 0 && r != -ENOENT) {
1185 lderr(cct) << "failed to retrieve local mirroring mode: " << cpp_strerror(r)
1186 << dendl;
1187 return r;
1188 } else if (local_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
1189 // copy mirror mode from remote peer
1190 r = mode_set(io_ctx, static_cast<rbd_mirror_mode_t>(remote_mirror_mode));
1191 if (r < 0) {
1192 return r;
1193 }
1194 }
1195
1196 if (direction == RBD_MIRROR_PEER_DIRECTION_RX_TX) {
1197 // create a local mirror peer user and export it to the remote cluster
1198 std::string local_client_id;
1199 std::string local_key;
1200 r = create_bootstrap_user(cct, rados, &local_client_id, &local_key);
1201 if (r < 0) {
1202 return r;
1203 }
1204
1205 std::string local_mon_host = cct->_conf.get_val<std::string>("mon_host");
1206
1207 // create local cluster peer in remote cluster
1208 r = create_bootstrap_peer(cct, remote_io_ctx, local_site_name, local_fsid,
1209 local_client_id, local_key, local_mon_host,
1210 "local", "remote");
1211 if (r < 0) {
1212 return r;
1213 }
1214 }
1215
1216 // create remote cluster peer in local cluster
1217 r = create_bootstrap_peer(cct, io_ctx, remote_site_name, remote_fsid,
1218 remote_client_id, remote_key, remote_mon_host,
1219 "remote", "local");
1220 if (r < 0) {
1221 return r;
1222 }
1223
1224 return 0;
1225 }
1226
1227 template <typename I>
1228 int Mirror<I>::peer_add(librados::IoCtx& io_ctx, std::string *uuid,
1229 const std::string &cluster_name,
1230 const std::string &client_name) {
1231 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1232 ldout(cct, 20) << "name=" << cluster_name << ", "
1233 << "client=" << client_name << dendl;
1234
1235 // TODO
1236 if (!io_ctx.get_namespace().empty()) {
1237 lderr(cct) << "namespaces are not supported" << dendl;
1238 return -EINVAL;
1239 }
1240
1241 if (cct->_conf->cluster == cluster_name) {
1242 lderr(cct) << "cannot add self as remote peer" << dendl;
1243 return -EINVAL;
1244 }
1245
1246 int r;
1247 do {
1248 uuid_d uuid_gen;
1249 uuid_gen.generate_random();
1250
1251 *uuid = uuid_gen.to_string();
1252 r = cls_client::mirror_peer_add(&io_ctx, *uuid, cluster_name,
1253 client_name);
1254 if (r == -ESTALE) {
1255 ldout(cct, 5) << "duplicate UUID detected, retrying" << dendl;
1256 } else if (r < 0) {
1257 lderr(cct) << "failed to add mirror peer '" << uuid << "': "
1258 << cpp_strerror(r) << dendl;
1259 return r;
1260 }
1261 } while (r == -ESTALE);
1262 return 0;
1263 }
1264
1265 template <typename I>
1266 int Mirror<I>::peer_remove(librados::IoCtx& io_ctx, const std::string &uuid) {
1267 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1268 ldout(cct, 20) << "uuid=" << uuid << dendl;
1269
1270 int r = remove_peer_config_key(io_ctx, uuid);
1271 if (r < 0) {
1272 lderr(cct) << "failed to remove peer attributes '" << uuid << "': "
1273 << cpp_strerror(r) << dendl;
1274 return r;
1275 }
1276
1277 r = cls_client::mirror_peer_remove(&io_ctx, uuid);
1278 if (r < 0 && r != -ENOENT) {
1279 lderr(cct) << "failed to remove peer '" << uuid << "': "
1280 << cpp_strerror(r) << dendl;
1281 return r;
1282 }
1283 return 0;
1284 }
1285
1286 template <typename I>
1287 int Mirror<I>::peer_list(librados::IoCtx& io_ctx,
1288 std::vector<mirror_peer_t> *peers) {
1289 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1290 ldout(cct, 20) << dendl;
1291
1292 std::vector<cls::rbd::MirrorPeer> mirror_peers;
1293 int r = cls_client::mirror_peer_list(&io_ctx, &mirror_peers);
1294 if (r < 0 && r != -ENOENT) {
1295 lderr(cct) << "failed to list peers: " << cpp_strerror(r) << dendl;
1296 return r;
1297 }
1298
1299 peers->clear();
1300 peers->reserve(mirror_peers.size());
1301 for (auto &mirror_peer : mirror_peers) {
1302 mirror_peer_t peer;
1303 peer.uuid = mirror_peer.uuid;
1304 peer.cluster_name = mirror_peer.cluster_name;
1305 peer.client_name = mirror_peer.client_name;
1306 peers->push_back(peer);
1307 }
1308 return 0;
1309 }
1310
1311 template <typename I>
1312 int Mirror<I>::peer_set_client(librados::IoCtx& io_ctx, const std::string &uuid,
1313 const std::string &client_name) {
1314 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1315 ldout(cct, 20) << "uuid=" << uuid << ", "
1316 << "client=" << client_name << dendl;
1317
1318 int r = cls_client::mirror_peer_set_client(&io_ctx, uuid, client_name);
1319 if (r < 0) {
1320 lderr(cct) << "failed to update client '" << uuid << "': "
1321 << cpp_strerror(r) << dendl;
1322 return r;
1323 }
1324 return 0;
1325 }
1326
1327 template <typename I>
1328 int Mirror<I>::peer_set_cluster(librados::IoCtx& io_ctx,
1329 const std::string &uuid,
1330 const std::string &cluster_name) {
1331 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1332 ldout(cct, 20) << "uuid=" << uuid << ", "
1333 << "cluster=" << cluster_name << dendl;
1334
1335 if (cct->_conf->cluster == cluster_name) {
1336 lderr(cct) << "cannot set self as remote peer" << dendl;
1337 return -EINVAL;
1338 }
1339
1340 int r = cls_client::mirror_peer_set_cluster(&io_ctx, uuid, cluster_name);
1341 if (r < 0) {
1342 lderr(cct) << "failed to update cluster '" << uuid << "': "
1343 << cpp_strerror(r) << dendl;
1344 return r;
1345 }
1346 return 0;
1347 }
1348
1349 template <typename I>
1350 int Mirror<I>::peer_get_attributes(librados::IoCtx& io_ctx,
1351 const std::string &uuid,
1352 Attributes* attributes) {
1353 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1354 ldout(cct, 20) << "uuid=" << uuid << dendl;
1355
1356 attributes->clear();
1357
1358 librados::Rados rados(io_ctx);
1359 std::string value;
1360 int r = get_config_key(rados, get_peer_config_key_name(io_ctx.get_id(), uuid),
1361 &value);
1362 if (r == -ENOENT || value.empty()) {
1363 return -ENOENT;
1364 } else if (r < 0) {
1365 lderr(cct) << "failed to retrieve peer attributes: " << cpp_strerror(r)
1366 << dendl;
1367 return r;
1368 }
1369
1370 bool json_valid = false;
1371 json_spirit::mValue json_root;
1372 if(json_spirit::read(value, json_root)) {
1373 try {
1374 auto& json_obj = json_root.get_obj();
1375 for (auto& pairs : json_obj) {
1376 (*attributes)[pairs.first] = pairs.second.get_str();
1377 }
1378 json_valid = true;
1379 } catch (std::runtime_error&) {
1380 }
1381 }
1382
1383 if (!json_valid) {
1384 lderr(cct) << "invalid peer attributes JSON received" << dendl;
1385 return -EINVAL;
1386 }
1387 return 0;
1388 }
1389
1390 template <typename I>
1391 int Mirror<I>::peer_set_attributes(librados::IoCtx& io_ctx,
1392 const std::string &uuid,
1393 const Attributes& attributes) {
1394 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1395 ldout(cct, 20) << "uuid=" << uuid << ", "
1396 << "attributes=" << attributes << dendl;
1397
1398 std::vector<mirror_peer_t> mirror_peers;
1399 int r = peer_list(io_ctx, &mirror_peers);
1400 if (r < 0) {
1401 return r;
1402 }
1403
1404 if (std::find_if(mirror_peers.begin(), mirror_peers.end(),
1405 [&uuid](const librbd::mirror_peer_t& peer) {
1406 return uuid == peer.uuid;
1407 }) == mirror_peers.end()) {
1408 ldout(cct, 5) << "mirror peer uuid " << uuid << " does not exist" << dendl;
1409 return -ENOENT;
1410 }
1411
1412 std::stringstream ss;
1413 ss << "{";
1414 for (auto& pair : attributes) {
1415 ss << "\\\"" << pair.first << "\\\": "
1416 << "\\\"" << pair.second << "\\\"";
1417 if (&pair != &(*attributes.rbegin())) {
1418 ss << ", ";
1419 }
1420 }
1421 ss << "}";
1422
1423 librados::Rados rados(io_ctx);
1424 r = set_config_key(rados, get_peer_config_key_name(io_ctx.get_id(), uuid),
1425 ss.str());
1426 if (r < 0 && r != -ENOENT) {
1427 lderr(cct) << "failed to update peer attributes: " << cpp_strerror(r)
1428 << dendl;
1429 return r;
1430 }
1431
1432 return 0;
1433 }
1434
1435 template <typename I>
1436 int Mirror<I>::image_status_list(librados::IoCtx& io_ctx,
1437 const std::string &start_id, size_t max,
1438 IdToMirrorImageStatus *images) {
1439 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1440 int r;
1441
1442 map<string, string> id_to_name;
1443 {
1444 map<string, string> name_to_id;
1445 r = Image<I>::list_images_v2(io_ctx, &name_to_id);
1446 if (r < 0) {
1447 return r;
1448 }
1449 for (auto it : name_to_id) {
1450 id_to_name[it.second] = it.first;
1451 }
1452 }
1453
1454 map<std::string, cls::rbd::MirrorImage> images_;
1455 map<std::string, cls::rbd::MirrorImageStatus> statuses_;
1456
1457 r = librbd::cls_client::mirror_image_status_list(&io_ctx, start_id, max,
1458 &images_, &statuses_);
1459 if (r < 0 && r != -ENOENT) {
1460 lderr(cct) << "failed to list mirror image statuses: "
1461 << cpp_strerror(r) << dendl;
1462 return r;
1463 }
1464
1465 cls::rbd::MirrorImageStatus unknown_status(
1466 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN, "status not found");
1467
1468 for (auto it = images_.begin(); it != images_.end(); ++it) {
1469 auto &image_id = it->first;
1470 auto &info = it->second;
1471 if (info.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLED) {
1472 continue;
1473 }
1474
1475 auto &image_name = id_to_name[image_id];
1476 if (image_name.empty()) {
1477 lderr(cct) << "failed to find image name for image " << image_id << ", "
1478 << "using image id as name" << dendl;
1479 image_name = image_id;
1480 }
1481 auto s_it = statuses_.find(image_id);
1482 auto &s = s_it != statuses_.end() ? s_it->second : unknown_status;
1483 (*images)[image_id] = mirror_image_status_t{
1484 image_name,
1485 mirror_image_info_t{
1486 info.global_image_id,
1487 static_cast<mirror_image_state_t>(info.state),
1488 false}, // XXX: To set "primary" right would require an additional call.
1489 static_cast<mirror_image_status_state_t>(s.state),
1490 s.description,
1491 s.last_update.sec(),
1492 s.up};
1493 }
1494
1495 return 0;
1496 }
1497
1498 template <typename I>
1499 int Mirror<I>::image_status_summary(librados::IoCtx& io_ctx,
1500 MirrorImageStatusStates *states) {
1501 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1502
1503 std::map<cls::rbd::MirrorImageStatusState, int> states_;
1504 int r = cls_client::mirror_image_status_get_summary(&io_ctx, &states_);
1505 if (r < 0 && r != -ENOENT) {
1506 lderr(cct) << "failed to get mirror status summary: "
1507 << cpp_strerror(r) << dendl;
1508 return r;
1509 }
1510 for (auto &s : states_) {
1511 (*states)[static_cast<mirror_image_status_state_t>(s.first)] = s.second;
1512 }
1513 return 0;
1514 }
1515
1516 template <typename I>
1517 int Mirror<I>::image_instance_id_list(
1518 librados::IoCtx& io_ctx, const std::string &start_image_id, size_t max,
1519 std::map<std::string, std::string> *instance_ids) {
1520 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
1521 std::map<std::string, entity_inst_t> instances;
1522
1523 int r = librbd::cls_client::mirror_image_instance_list(
1524 &io_ctx, start_image_id, max, &instances);
1525 if (r < 0 && r != -ENOENT) {
1526 lderr(cct) << "failed to list mirror image instances: " << cpp_strerror(r)
1527 << dendl;
1528 return r;
1529 }
1530
1531 for (auto it : instances) {
1532 (*instance_ids)[it.first] = stringify(it.second.name.num());
1533 }
1534
1535 return 0;
1536 }
1537
1538 } // namespace api
1539 } // namespace librbd
1540
1541 template class librbd::api::Mirror<librbd::ImageCtx>;