]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/cls_rbd/test_cls_rbd.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / cls_rbd / test_cls_rbd.cc
index 8d03aadc1601ae8f2140c44647d7d461a3f3ea6c..ef753b749809be5587d14f4ca5c15ce7a97877db 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include "include/compat.h"
 #include "common/ceph_context.h"
 #include "common/config.h"
 #include "common/snap_types.h"
@@ -26,6 +27,9 @@
 
 using namespace std;
 using namespace librbd::cls_client;
+using cls::rbd::MIRROR_PEER_DIRECTION_RX;
+using cls::rbd::MIRROR_PEER_DIRECTION_TX;
+using cls::rbd::MIRROR_PEER_DIRECTION_RX_TX;
 using ::librbd::ParentImageInfo;
 using ceph::encode;
 using ceph::decode;
@@ -179,6 +183,74 @@ TEST_F(TestClsRbd, copyup)
   ioctx.close();
 }
 
+TEST_F(TestClsRbd, sparse_copyup)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string oid = get_temp_image_name();
+  ioctx.remove(oid);
+
+  bool sparse_read_supported = is_sparse_read_supported(ioctx, oid);
+
+  // copyup of 0-len nonexistent object should create new 0-len object
+  uint64_t size;
+  ASSERT_EQ(-ENOENT, ioctx.stat(oid, &size, nullptr));
+  std::map<uint64_t, uint64_t> m;
+  bufferlist inbl;
+  ASSERT_EQ(0, sparse_copyup(&ioctx, oid, m, inbl));
+  ASSERT_EQ(0, ioctx.stat(oid, &size, nullptr));
+  ASSERT_EQ(0U, size);
+
+  // create some data to write
+  inbl.append(std::string(4096, '1'));
+  inbl.append(std::string(4096, '2'));
+  m = {{1024, 4096}, {8192, 4096}};
+
+  // copyup to nonexistent object should create new object
+  ioctx.remove(oid);
+  ASSERT_EQ(-ENOENT, ioctx.remove(oid));
+  ASSERT_EQ(0, sparse_copyup(&ioctx, oid, m, inbl));
+  // and its contents should match
+  bufferlist outbl;
+  bufferlist expected_outbl;
+  expected_outbl.append(std::string(1024, '\0'));
+  expected_outbl.append(std::string(4096, '1'));
+  expected_outbl.append(std::string(8192 - 4096 - 1024, '\0'));
+  expected_outbl.append(std::string(4096, '2'));
+  ASSERT_EQ((int)expected_outbl.length(),
+            ioctx.read(oid, outbl, expected_outbl.length() + 1, 0));
+  ASSERT_TRUE(outbl.contents_equal(expected_outbl));
+  std::map<uint64_t, uint64_t> expected_m;
+  if (sparse_read_supported) {
+    expected_m = m;
+    expected_outbl = inbl;
+  } else {
+    expected_m = {{0, expected_outbl.length()}};
+  }
+  m.clear();
+  outbl.clear();
+  ASSERT_EQ((int)expected_m.size(), ioctx.sparse_read(oid, m, outbl, 65536, 0));
+  ASSERT_EQ(m, expected_m);
+  ASSERT_TRUE(outbl.contents_equal(expected_outbl));
+
+  // now send different data, but with a preexisting object
+  bufferlist inbl2;
+  inbl2.append(std::string(1024, 'X'));
+
+  // should still succeed
+  ASSERT_EQ(0, sparse_copyup(&ioctx, oid, {{0, 1024}}, inbl2));
+  // but contents should not have changed
+  m.clear();
+  outbl.clear();
+  ASSERT_EQ((int)expected_m.size(), ioctx.sparse_read(oid, m, outbl, 65536, 0));
+  ASSERT_EQ(m, expected_m);
+  ASSERT_TRUE(outbl.contents_equal(expected_outbl));
+
+  ASSERT_EQ(0, ioctx.remove(oid));
+  ioctx.close();
+}
+
 TEST_F(TestClsRbd, get_and_set_id)
 {
   librados::IoCtx ioctx;
@@ -652,6 +724,10 @@ TEST_F(TestClsRbd, snapshot_limits)
 
   ASSERT_EQ(0, create_image(&ioctx, oid, 0, 22, RBD_FEATURE_LAYERING, oid, -1));
 
+  // if snapshot doesn't set limit, the limit is UINT64_MAX
+  ASSERT_EQ(0, snapshot_get_limit(&ioctx, oid, &limit));
+  ASSERT_EQ(UINT64_MAX, limit);
+
   snapshot_set_limit(&op, 2);
 
   ASSERT_EQ(0, ioctx.operate(oid, &op));
@@ -1049,6 +1125,9 @@ TEST_F(TestClsRbd, snapshots)
   ASSERT_EQ(0u, snapc.snaps[1]);
   ASSERT_EQ(1u, snapc.seq);
 
+  // snap id less than current snap seq
+  ASSERT_EQ(-ESTALE, snapshot_add(&ioctx, oid, 0, "snap3"));
+
   ASSERT_EQ(0, snapshot_get(&ioctx, oid, 1, &snap));
   ASSERT_EQ("snap2", snap.name);
   ASSERT_EQ(userSnapNamespace, snap.snapshot_namespace);
@@ -1528,7 +1607,10 @@ TEST_F(TestClsRbd, mirror) {
 
   std::string uuid;
   ASSERT_EQ(-ENOENT, mirror_uuid_get(&ioctx, &uuid));
-  ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client"));
+  ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, {"uuid1", MIRROR_PEER_DIRECTION_RX,
+                                              "siteA", "client",
+                                              "mirror uuid"}));
+  ASSERT_EQ(-EINVAL, mirror_peer_ping(&ioctx, "siteA", "mirror uuid"));
 
   cls::rbd::MirrorMode mirror_mode;
   ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
@@ -1550,39 +1632,92 @@ TEST_F(TestClsRbd, mirror) {
   ASSERT_EQ(0, mirror_mode_get(&ioctx, &mirror_mode));
   ASSERT_EQ(cls::rbd::MIRROR_MODE_POOL, mirror_mode);
 
-  ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, "mirror-uuid", "cluster1", "client"));
-  ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid1", "cluster1", "client"));
-  ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid2", "cluster2", "admin"));
-  ASSERT_EQ(-ESTALE, mirror_peer_add(&ioctx, "uuid2", "cluster3", "foo"));
-  ASSERT_EQ(-EEXIST, mirror_peer_add(&ioctx, "uuid3", "cluster1", "foo"));
-  ASSERT_EQ(0, mirror_peer_add(&ioctx, "uuid3", "cluster3", "admin"));
-  ASSERT_EQ(-EEXIST, mirror_peer_add(&ioctx, "uuid4", "cluster3", "admin"));
+  ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, {"mirror-uuid",
+                                              MIRROR_PEER_DIRECTION_RX, "siteA",
+                                              "client", ""}));
+  ASSERT_EQ(-EINVAL, mirror_peer_add(&ioctx, {"uuid1", MIRROR_PEER_DIRECTION_TX,
+                                              "siteA", "client",
+                                              "mirror uuid"}));
+  ASSERT_EQ(0, mirror_peer_add(&ioctx, {"uuid1", MIRROR_PEER_DIRECTION_RX,
+                                        "siteA", "client", "fsidA"}));
+  ASSERT_EQ(0, mirror_peer_add(&ioctx, {"uuid2", MIRROR_PEER_DIRECTION_RX,
+                                        "siteB", "admin", ""}));
+  ASSERT_EQ(-ESTALE, mirror_peer_add(&ioctx, {"uuid2", MIRROR_PEER_DIRECTION_RX,
+                                              "siteC", "foo", ""}));
+  ASSERT_EQ(-EEXIST, mirror_peer_add(&ioctx, {"uuid3", MIRROR_PEER_DIRECTION_RX,
+                                              "siteA", "foo", ""}));
+  ASSERT_EQ(-EEXIST, mirror_peer_add(&ioctx, {"uuid3", MIRROR_PEER_DIRECTION_RX,
+                                              "siteC", "client", "fsidA"}));
+  ASSERT_EQ(0, mirror_peer_add(&ioctx, {"uuid3", MIRROR_PEER_DIRECTION_RX,
+                                        "siteC", "admin", ""}));
+  ASSERT_EQ(0, mirror_peer_add(&ioctx, {"uuid4", MIRROR_PEER_DIRECTION_RX,
+                                        "siteD", "admin", ""}));
 
   ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
   std::vector<cls::rbd::MirrorPeer> expected_peers = {
-    {"uuid1", "cluster1", "client", -1},
-    {"uuid2", "cluster2", "admin", -1},
-    {"uuid3", "cluster3", "admin", -1}};
+    {"uuid1", MIRROR_PEER_DIRECTION_RX, "siteA", "client", "fsidA"},
+    {"uuid2", MIRROR_PEER_DIRECTION_RX, "siteB", "admin", ""},
+    {"uuid3", MIRROR_PEER_DIRECTION_RX, "siteC", "admin", ""},
+    {"uuid4", MIRROR_PEER_DIRECTION_RX, "siteD", "admin", ""}};
   ASSERT_EQ(expected_peers, peers);
 
   ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid5"));
+  ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid4"));
   ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid2"));
 
+  ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
+  expected_peers = {
+    {"uuid1", MIRROR_PEER_DIRECTION_RX, "siteA", "client", "fsidA"},
+    {"uuid3", MIRROR_PEER_DIRECTION_RX, "siteC", "admin", ""}};
+  ASSERT_EQ(expected_peers, peers);
+
   ASSERT_EQ(-ENOENT, mirror_peer_set_client(&ioctx, "uuid4", "new client"));
   ASSERT_EQ(0, mirror_peer_set_client(&ioctx, "uuid1", "new client"));
 
-  ASSERT_EQ(-ENOENT, mirror_peer_set_cluster(&ioctx, "uuid4", "new cluster"));
-  ASSERT_EQ(0, mirror_peer_set_cluster(&ioctx, "uuid3", "new cluster"));
+  ASSERT_EQ(-ENOENT, mirror_peer_set_cluster(&ioctx, "uuid4", "new site"));
+  ASSERT_EQ(0, mirror_peer_set_cluster(&ioctx, "uuid3", "new site"));
 
   ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
   expected_peers = {
-    {"uuid1", "cluster1", "new client", -1},
-    {"uuid3", "new cluster", "admin", -1}};
+    {"uuid1", MIRROR_PEER_DIRECTION_RX, "siteA", "new client", "fsidA"},
+    {"uuid3", MIRROR_PEER_DIRECTION_RX, "new site", "admin", ""}};
   ASSERT_EQ(expected_peers, peers);
-  ASSERT_EQ(-EBUSY, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_DISABLED));
 
-  ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid3"));
   ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid1"));
+
+  ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
+  expected_peers = {
+    {"uuid3", MIRROR_PEER_DIRECTION_RX, "new site", "admin", ""}};
+  ASSERT_EQ(expected_peers, peers);
+
+  ASSERT_EQ(-EINVAL, mirror_peer_ping(&ioctx, "", "mirror uuid"));
+  ASSERT_EQ(-EINVAL, mirror_peer_ping(&ioctx, "new site", ""));
+  ASSERT_EQ(0, mirror_peer_ping(&ioctx, "new site", "mirror uuid"));
+
+  ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
+  ASSERT_EQ(1U, peers.size());
+  ASSERT_LT(utime_t{}, peers[0].last_seen);
+  expected_peers = {
+    {"uuid3", MIRROR_PEER_DIRECTION_RX_TX, "new site", "admin", "mirror uuid"}};
+  expected_peers[0].last_seen = peers[0].last_seen;
+  ASSERT_EQ(expected_peers, peers);
+  ASSERT_EQ(0, mirror_peer_remove(&ioctx, "uuid3"));
+
+  ASSERT_EQ(0, mirror_peer_ping(&ioctx, "siteA", "mirror uuid"));
+
+  ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
+  ASSERT_EQ(1U, peers.size());
+  ASSERT_FALSE(peers[0].uuid.empty());
+  ASSERT_LT(utime_t{}, peers[0].last_seen);
+  expected_peers = {
+    {peers[0].uuid, MIRROR_PEER_DIRECTION_TX, "siteA", "", "mirror uuid"}};
+  expected_peers[0].last_seen = peers[0].last_seen;
+  ASSERT_EQ(expected_peers, peers);
+
+  ASSERT_EQ(-EBUSY, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_DISABLED));
+  ASSERT_EQ(0, mirror_peer_remove(&ioctx, peers[0].uuid));
+
+  ASSERT_EQ(0, mirror_peer_remove(&ioctx, "DNE"));
   ASSERT_EQ(0, mirror_peer_list(&ioctx, &peers));
   expected_peers = {};
   ASSERT_EQ(expected_peers, peers);
@@ -1601,9 +1736,12 @@ TEST_F(TestClsRbd, mirror_image) {
   std::map<std::string, std::string> mirror_image_ids;
   ASSERT_EQ(-ENOENT, mirror_image_list(&ioctx, "", 0, &mirror_image_ids));
 
-  cls::rbd::MirrorImage image1("uuid1", cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
-  cls::rbd::MirrorImage image2("uuid2", cls::rbd::MIRROR_IMAGE_STATE_DISABLING);
-  cls::rbd::MirrorImage image3("uuid3", cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+  cls::rbd::MirrorImage image1(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid1",
+                               cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+  cls::rbd::MirrorImage image2(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid2",
+                               cls::rbd::MIRROR_IMAGE_STATE_DISABLING);
+  cls::rbd::MirrorImage image3(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid3",
+                               cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
 
   ASSERT_EQ(0, mirror_image_set(&ioctx, "image_id1", image1));
   ASSERT_EQ(-ENOENT, mirror_image_set(&ioctx, "image_id2", image2));
@@ -1692,17 +1830,23 @@ TEST_F(TestClsRbd, mirror_image_status) {
 
   // Test status set
 
-  cls::rbd::MirrorImage image1("uuid1", cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
-  cls::rbd::MirrorImage image2("uuid2", cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
-  cls::rbd::MirrorImage image3("uuid3", cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+  cls::rbd::MirrorImage image1(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid1",
+                               cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+  cls::rbd::MirrorImage image2(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid2",
+                               cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+  cls::rbd::MirrorImage image3(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, "uuid3",
+                               cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
 
   ASSERT_EQ(0, mirror_image_set(&ioctx, "image_id1", image1));
   ASSERT_EQ(0, mirror_image_set(&ioctx, "image_id2", image2));
   ASSERT_EQ(0, mirror_image_set(&ioctx, "image_id3", image3));
 
-  cls::rbd::MirrorImageStatus status1(cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
-  cls::rbd::MirrorImageStatus status2(cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING);
-  cls::rbd::MirrorImageStatus status3(cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR);
+  cls::rbd::MirrorImageSiteStatus status1(
+    "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN, "");
+  cls::rbd::MirrorImageSiteStatus status2(
+    "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "");
+  cls::rbd::MirrorImageSiteStatus status3(
+    "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR, "");
 
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid1", status1));
   images.clear();
@@ -1714,13 +1858,15 @@ TEST_F(TestClsRbd, mirror_image_status) {
   // Test status is down due to RBD_MIRRORING is not watched
 
   status1.up = false;
-  ASSERT_EQ(statuses["image_id1"], status1);
+  ASSERT_EQ(statuses["image_id1"], cls::rbd::MirrorImageStatus{{status1}});
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
-  ASSERT_EQ(read_status, status1);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status1}});
 
   // Test status summary. All statuses are unknown due to down.
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  cls::rbd::MirrorPeer mirror_peer{
+    "uuid", cls::rbd::MIRROR_PEER_DIRECTION_RX, "siteA", "client", "fsidA"};
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(1U, states.size());
   ASSERT_EQ(3, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
 
@@ -1739,7 +1885,7 @@ TEST_F(TestClsRbd, mirror_image_status) {
   ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
   ASSERT_EQ(3U, images.size());
   ASSERT_TRUE(statuses.empty());
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(1U, states.size());
   ASSERT_EQ(3, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
 
@@ -1755,22 +1901,22 @@ TEST_F(TestClsRbd, mirror_image_status) {
 
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
   status1.up = true;
-  ASSERT_EQ(read_status, status1);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status1}});
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid2", &read_status));
   status2.up = true;
-  ASSERT_EQ(read_status, status2);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status2}});
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid3", &read_status));
   status3.up = true;
-  ASSERT_EQ(read_status, status3);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status3}});
 
   images.clear();
   statuses.clear();
   ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
   ASSERT_EQ(3U, images.size());
   ASSERT_EQ(3U, statuses.size());
-  ASSERT_EQ(statuses["image_id1"], status1);
-  ASSERT_EQ(statuses["image_id2"], status2);
-  ASSERT_EQ(statuses["image_id3"], status3);
+  ASSERT_EQ(statuses["image_id1"], cls::rbd::MirrorImageStatus{{status1}});
+  ASSERT_EQ(statuses["image_id2"], cls::rbd::MirrorImageStatus{{status2}});
+  ASSERT_EQ(statuses["image_id3"], cls::rbd::MirrorImageStatus{{status3}});
 
   read_instance = {};
   ASSERT_EQ(0, mirror_image_instance_get(&ioctx, "uuid1", &read_instance));
@@ -1784,18 +1930,18 @@ TEST_F(TestClsRbd, mirror_image_status) {
 
   ASSERT_EQ(0, mirror_image_status_remove_down(&ioctx));
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
-  ASSERT_EQ(read_status, status1);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status1}});
   images.clear();
   statuses.clear();
   ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
   ASSERT_EQ(3U, images.size());
   ASSERT_EQ(3U, statuses.size());
-  ASSERT_EQ(statuses["image_id1"], status1);
-  ASSERT_EQ(statuses["image_id2"], status2);
-  ASSERT_EQ(statuses["image_id3"], status3);
+  ASSERT_EQ(statuses["image_id1"], cls::rbd::MirrorImageStatus{{status1}});
+  ASSERT_EQ(statuses["image_id2"], cls::rbd::MirrorImageStatus{{status2}});
+  ASSERT_EQ(statuses["image_id3"], cls::rbd::MirrorImageStatus{{status3}});
 
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(3U, states.size());
   ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
   ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
@@ -1807,63 +1953,104 @@ TEST_F(TestClsRbd, mirror_image_status) {
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid1", status1));
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid3", status3));
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid3", &read_status));
-  ASSERT_EQ(read_status, status3);
+  ASSERT_EQ(read_status, cls::rbd::MirrorImageStatus{{status3}});
 
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(1U, states.size());
   ASSERT_EQ(3, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
 
-  // Test remove
+  // Remote status
 
-  ASSERT_EQ(0, mirror_image_status_remove(&ioctx, "uuid3"));
-  ASSERT_EQ(-ENOENT, mirror_image_status_get(&ioctx, "uuid3", &read_status));
-  images.clear();
-  statuses.clear();
-  ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
-  ASSERT_EQ(3U, images.size());
-  ASSERT_EQ(2U, statuses.size());
-  ASSERT_EQ(statuses["image_id1"], status1);
-  ASSERT_EQ(statuses["image_id2"], status2);
-
-  states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
-  ASSERT_EQ(2U, states.size());
-  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
-  ASSERT_EQ(2, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
+  ASSERT_EQ(0, mirror_uuid_set(&ioctx, "mirror-uuid"));
+  ASSERT_EQ(0, mirror_mode_set(&ioctx, cls::rbd::MIRROR_MODE_POOL));
 
-  // Test statuses are down after removing watcher
+  ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
+  cls::rbd::MirrorImageStatus expected_status1({status1});
+  ASSERT_EQ(expected_status1, read_status);
 
+  cls::rbd::MirrorImageSiteStatus remote_status1(
+    "fsidA", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "");
+  ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid1", remote_status1));
+  ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
+  remote_status1.up = true;
+  expected_status1 = {{status1, remote_status1}};
+  ASSERT_EQ(expected_status1, read_status);
+
+  // summary under different modes
+  cls::rbd::MirrorImageSiteStatus remote_status2(
+    "fsidA", cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING, "");
+  remote_status2.up = true;
+  cls::rbd::MirrorImageSiteStatus remote_status3(
+    "fsidA", cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN, "");
+  remote_status3.up = true;
+
+  status1.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid1", status1));
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid2", status2));
   ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid3", status3));
+  ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid2", remote_status2));
+  ASSERT_EQ(0, mirror_image_status_set(&ioctx, "uuid3", remote_status3));
+
+  expected_status1 = {{status1, remote_status1}};
+  cls::rbd::MirrorImageStatus expected_status2({status2, remote_status2});
+  cls::rbd::MirrorImageStatus expected_status3({status3, remote_status3});
 
   images.clear();
   statuses.clear();
   ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
   ASSERT_EQ(3U, images.size());
   ASSERT_EQ(3U, statuses.size());
-  ASSERT_EQ(statuses["image_id1"], status1);
-  ASSERT_EQ(statuses["image_id2"], status2);
-  ASSERT_EQ(statuses["image_id3"], status3);
+  ASSERT_EQ(statuses["image_id1"], expected_status1);
+  ASSERT_EQ(statuses["image_id2"], expected_status2);
+  ASSERT_EQ(statuses["image_id3"], expected_status3);
+
+  states.clear();
+  mirror_peer.mirror_peer_direction = cls::rbd::MIRROR_PEER_DIRECTION_RX;
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
+  ASSERT_EQ(2U, states.size());
+  ASSERT_EQ(2, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
+  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR]);
+
+  states.clear();
+  mirror_peer.mirror_peer_direction = cls::rbd::MIRROR_PEER_DIRECTION_TX;
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
+  ASSERT_EQ(2U, states.size());
+  ASSERT_EQ(2, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
+  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
+
+  states.clear();
+  mirror_peer.mirror_peer_direction = cls::rbd::MIRROR_PEER_DIRECTION_RX_TX;
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
+  ASSERT_EQ(3U, states.size());
+  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING]);
+  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
+  ASSERT_EQ(1, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR]);
 
+  // Test statuses are down after removing watcher
   ioctx.unwatch2(watch_handle);
 
   ASSERT_EQ(0, mirror_image_status_list(&ioctx, "", 1024, &images, &statuses));
   ASSERT_EQ(3U, images.size());
   ASSERT_EQ(3U, statuses.size());
   status1.up = false;
-  ASSERT_EQ(statuses["image_id1"], status1);
+  remote_status1.up = false;
+  expected_status1 = {{status1, remote_status1}};
+  ASSERT_EQ(statuses["image_id1"], expected_status1);
   status2.up = false;
-  ASSERT_EQ(statuses["image_id2"], status2);
+  remote_status2.up = false;
+  expected_status2 = {{status2, remote_status2}};
+  ASSERT_EQ(statuses["image_id2"], expected_status2);
   status3.up = false;
-  ASSERT_EQ(statuses["image_id3"], status3);
+  remote_status3.up = false;
+  expected_status3 = {{status3, remote_status3}};
+  ASSERT_EQ(statuses["image_id3"], expected_status3);
 
   ASSERT_EQ(0, mirror_image_status_get(&ioctx, "uuid1", &read_status));
-  ASSERT_EQ(read_status, status1);
+  ASSERT_EQ(read_status, expected_status1);
 
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(1U, states.size());
   ASSERT_EQ(3, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
 
@@ -1882,7 +2069,7 @@ TEST_F(TestClsRbd, mirror_image_status) {
   ASSERT_TRUE(statuses.empty());
 
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {mirror_peer}, &states));
   ASSERT_EQ(1U, states.size());
   ASSERT_EQ(3, states[cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN]);
 
@@ -1901,7 +2088,7 @@ TEST_F(TestClsRbd, mirror_image_status) {
   ASSERT_EQ(0, mirror_image_remove(&ioctx, "image_id3"));
 
   states.clear();
-  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, &states));
+  ASSERT_EQ(0, mirror_image_status_get_summary(&ioctx, {}, &states));
   ASSERT_EQ(0U, states.size());
 
   // Test status list with large number of images
@@ -1912,8 +2099,10 @@ TEST_F(TestClsRbd, mirror_image_status) {
   for (size_t i = 0; i < N; i++) {
     std::string id = "id" + stringify(i);
     std::string uuid = "uuid" + stringify(i);
-    cls::rbd::MirrorImage image(uuid, cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
-    cls::rbd::MirrorImageStatus status(cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN);
+    cls::rbd::MirrorImage image(cls::rbd::MIRROR_IMAGE_MODE_JOURNAL, uuid,
+                                cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
+    cls::rbd::MirrorImageSiteStatus status(
+      "", cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN, "");
     ASSERT_EQ(0, mirror_image_set(&ioctx, id, image));
     ASSERT_EQ(0, mirror_image_status_set(&ioctx, uuid, status));
   }
@@ -2043,6 +2232,78 @@ TEST_F(TestClsRbd, mirror_instances) {
   ASSERT_EQ(0U, instance_ids.size());
 }
 
+TEST_F(TestClsRbd, mirror_snapshot) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string oid = get_temp_image_name();
+  ASSERT_EQ(0, create_image(&ioctx, oid, 10, 22, 0, oid, -1));
+
+  cls::rbd::MirrorSnapshotNamespace primary = {
+    cls::rbd::MIRROR_SNAPSHOT_STATE_PRIMARY, {"peer1", "peer2"}, "",
+    CEPH_NOSNAP};
+  cls::rbd::MirrorSnapshotNamespace non_primary = {
+    cls::rbd::MIRROR_SNAPSHOT_STATE_NON_PRIMARY, {"peer1"}, "uuid", 123};
+  librados::ObjectWriteOperation op;
+  ::librbd::cls_client::snapshot_add(&op, 1, "primary", primary);
+  ::librbd::cls_client::snapshot_add(&op, 2, "non_primary", non_primary);
+  ASSERT_EQ(0, ioctx.operate(oid, &op));
+
+  cls::rbd::SnapshotInfo snap;
+  ASSERT_EQ(0, snapshot_get(&ioctx, oid, 1, &snap));
+  auto sn = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &snap.snapshot_namespace);
+  ASSERT_NE(nullptr, sn);
+  ASSERT_EQ(primary, *sn);
+  ASSERT_EQ(2U, sn->mirror_peer_uuids.size());
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.count("peer1"));
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.count("peer2"));
+
+  ASSERT_EQ(-ENOENT, mirror_image_snapshot_unlink_peer(&ioctx, oid, 1, "peer"));
+  ASSERT_EQ(0, mirror_image_snapshot_unlink_peer(&ioctx, oid, 1, "peer1"));
+  ASSERT_EQ(-ENOENT, mirror_image_snapshot_unlink_peer(&ioctx, oid, 1,
+                                                       "peer1"));
+  ASSERT_EQ(0, snapshot_get(&ioctx, oid, 1, &snap));
+  sn = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &snap.snapshot_namespace);
+  ASSERT_NE(nullptr, sn);
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.size());
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.count("peer2"));
+
+  ASSERT_EQ(-ERESTART,
+            mirror_image_snapshot_unlink_peer(&ioctx, oid, 1, "peer2"));
+  ASSERT_EQ(0, snapshot_get(&ioctx, oid, 1, &snap));
+  sn = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &snap.snapshot_namespace);
+  ASSERT_NE(nullptr, sn);
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.size());
+  ASSERT_EQ(1U, sn->mirror_peer_uuids.count("peer2"));
+
+  ASSERT_EQ(0, snapshot_get(&ioctx, oid, 2, &snap));
+  auto nsn = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &snap.snapshot_namespace);
+  ASSERT_NE(nullptr, nsn);
+  ASSERT_EQ(non_primary, *nsn);
+  ASSERT_EQ(1U, nsn->mirror_peer_uuids.size());
+  ASSERT_EQ(1U, nsn->mirror_peer_uuids.count("peer1"));
+  ASSERT_FALSE(nsn->complete);
+  ASSERT_EQ(nsn->last_copied_object_number, 0);
+
+  ASSERT_EQ(0, mirror_image_snapshot_set_copy_progress(&ioctx, oid, 2, true,
+                                                       10));
+  ASSERT_EQ(0, snapshot_get(&ioctx, oid, 2, &snap));
+  nsn = boost::get<cls::rbd::MirrorSnapshotNamespace>(
+    &snap.snapshot_namespace);
+  ASSERT_NE(nullptr, nsn);
+  ASSERT_TRUE(nsn->complete);
+  ASSERT_EQ(nsn->last_copied_object_number, 10);
+
+  ASSERT_EQ(0, mirror_image_snapshot_unlink_peer(&ioctx, oid, 2, "peer1"));
+
+  ASSERT_EQ(0, snapshot_remove(&ioctx, oid, 1));
+  ASSERT_EQ(0, snapshot_remove(&ioctx, oid, 2));
+}
+
 TEST_F(TestClsRbd, group_dir_list) {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
@@ -2864,6 +3125,7 @@ TEST_F(TestClsRbd, migration)
 
   cls::rbd::MigrationSpec migration_spec(cls::rbd::MIGRATION_HEADER_TYPE_DST, 1,
                                          "name", "ns", "id", {}, 0, false,
+                                         cls::rbd::MIRROR_IMAGE_MODE_JOURNAL,
                                          false,
                                          cls::rbd::MIGRATION_STATE_PREPARING,
                                          "123");
@@ -2934,6 +3196,7 @@ TEST_F(TestClsRbd, migration_v1)
 
   cls::rbd::MigrationSpec migration_spec(cls::rbd::MIGRATION_HEADER_TYPE_DST, 1,
                                          "name", "ns", "id", {}, 0, false,
+                                         cls::rbd::MIRROR_IMAGE_MODE_JOURNAL,
                                          false,
                                          cls::rbd::MIGRATION_STATE_PREPARING,
                                          "123");