]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_volumes.py
import 15.2.5
[ceph.git] / ceph / qa / tasks / cephfs / test_volumes.py
index 11c23605ae8c99ad6875214ae76f841030bac9f0..88fbe04cd278fe69d71a31d41aa829d44e1a77db 100644 (file)
@@ -21,6 +21,7 @@ class TestVolumes(CephFSTestCase):
 
     # for filling subvolume with data
     CLIENTS_REQUIRED = 1
+    MDSS_REQUIRED = 2
 
     # io defaults
     DEFAULT_FILE_SIZE = 1 # MB
@@ -29,6 +30,9 @@ class TestVolumes(CephFSTestCase):
     def _fs_cmd(self, *args):
         return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
 
+    def _raw_cmd(self, *args):
+        return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
+
     def __check_clone_state(self, state, clone, clone_group=None, timo=120):
         check = 0
         args = ["clone", "status", self.volname, clone]
@@ -105,28 +109,33 @@ class TestVolumes(CephFSTestCase):
         self._verify_clone_attrs(subvolume, clone, source_group=source_group, clone_group=clone_group)
 
     def _generate_random_volume_name(self, count=1):
-        r = random.sample(range(10000), count)
-        volumes = ["{0}_{1}".format(TestVolumes.TEST_VOLUME_PREFIX, c) for c in r]
+        n = self.volume_start
+        volumes = [f"{TestVolumes.TEST_VOLUME_PREFIX}_{i:016}" for i in range(n, n+count)]
+        self.volume_start += count
         return volumes[0] if count == 1 else volumes
 
     def _generate_random_subvolume_name(self, count=1):
-        r = random.sample(range(10000), count)
-        subvolumes = ["{0}_{1}".format(TestVolumes.TEST_SUBVOLUME_PREFIX, c) for c in r]
+        n = self.subvolume_start
+        subvolumes = [f"{TestVolumes.TEST_SUBVOLUME_PREFIX}_{i:016}" for i in range(n, n+count)]
+        self.subvolume_start += count
         return subvolumes[0] if count == 1 else subvolumes
 
     def _generate_random_group_name(self, count=1):
-        r = random.sample(range(100), count)
-        groups = ["{0}_{1}".format(TestVolumes.TEST_GROUP_PREFIX, c) for c in r]
+        n = self.group_start
+        groups = [f"{TestVolumes.TEST_GROUP_PREFIX}_{i:016}" for i in range(n, n+count)]
+        self.group_start += count
         return groups[0] if count == 1 else groups
 
     def _generate_random_snapshot_name(self, count=1):
-        r = random.sample(range(100), count)
-        snaps = ["{0}_{1}".format(TestVolumes.TEST_SNAPSHOT_PREFIX, c) for c in r]
+        n = self.snapshot_start
+        snaps = [f"{TestVolumes.TEST_SNAPSHOT_PREFIX}_{i:016}" for i in range(n, n+count)]
+        self.snapshot_start += count
         return snaps[0] if count == 1 else snaps
 
     def _generate_random_clone_name(self, count=1):
-        r = random.sample(range(1000), count)
-        clones = ["{0}_{1}".format(TestVolumes.TEST_CLONE_PREFIX, c) for c in r]
+        n = self.clone_start
+        clones = [f"{TestVolumes.TEST_CLONE_PREFIX}_{i:016}" for i in range(n, n+count)]
+        self.clone_start += count
         return clones[0] if count == 1 else clones
 
     def _enable_multi_fs(self):
@@ -225,6 +234,12 @@ class TestVolumes(CephFSTestCase):
         self.vol_created = False
         self._enable_multi_fs()
         self._create_or_reuse_test_volume()
+        self.config_set('mon', 'mon_allow_pool_delete', True)
+        self.volume_start = random.randint(1, (1<<20))
+        self.subvolume_start = random.randint(1, (1<<20))
+        self.group_start = random.randint(1, (1<<20))
+        self.snapshot_start = random.randint(1, (1<<20))
+        self.clone_start = random.randint(1, (1<<20))
 
     def tearDown(self):
         if self.vol_created:
@@ -310,6 +325,54 @@ class TestVolumes(CephFSTestCase):
         else:
             raise RuntimeError("expected the 'fs volume rm' command to fail.")
 
+    def test_volume_rm_arbitrary_pool_removal(self):
+        """
+        That the arbitrary pool added to the volume out of band is removed
+        successfully on volume removal.
+        """
+        new_pool = "new_pool"
+        # add arbitrary data pool
+        self.fs.add_data_pool(new_pool)
+        vol_status = json.loads(self._fs_cmd("status", self.volname, "--format=json-pretty"))
+        self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
+
+        #check if fs is gone
+        volumes = json.loads(self._fs_cmd("volume", "ls", "--format=json-pretty"))
+        volnames = [volume['name'] for volume in volumes]
+        self.assertNotIn(self.volname, volnames)
+
+        #check if osd pools are gone
+        pools = json.loads(self._raw_cmd("osd", "pool", "ls", "--format=json-pretty"))
+        for pool in vol_status["pools"]:
+            self.assertNotIn(pool["name"], pools)
+
+    def test_volume_rm_when_mon_delete_pool_false(self):
+        """
+        That the volume can only be removed when mon_allowd_pool_delete is set
+        to true and verify that the pools are removed after volume deletion.
+        """
+        self.config_set('mon', 'mon_allow_pool_delete', False)
+        try:
+            self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EPERM,
+                             "expected the 'fs volume rm' command to fail with EPERM, "
+                             "but it failed with {0}".format(ce.exitstatus))
+        vol_status = json.loads(self._fs_cmd("status", self.volname, "--format=json-pretty"))
+        self.config_set('mon', 'mon_allow_pool_delete', True)
+        self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
+
+        #check if fs is gone
+        volumes = json.loads(self._fs_cmd("volume", "ls", "--format=json-pretty"))
+        volnames = [volume['name'] for volume in volumes]
+        self.assertNotIn(self.volname, volnames,
+                         "volume {0} exists after removal".format(self.volname))
+        #check if pools are gone
+        pools = json.loads(self._raw_cmd("osd", "pool", "ls", "--format=json-pretty"))
+        for pool in vol_status["pools"]:
+            self.assertNotIn(pool["name"], pools,
+                             "pool {0} exists after volume removal".format(pool["name"]))
+
     ### basic subvolume operations
 
     def test_subvolume_create_and_rm(self):
@@ -592,6 +655,44 @@ class TestVolumes(CephFSTestCase):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
+    def test_subvolume_pin_export(self):
+        self.fs.set_max_mds(2)
+        status = self.fs.wait_for_daemons()
+
+        subvolume = self._generate_random_subvolume_name()
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        self._fs_cmd("subvolume", "pin", self.volname, subvolume, "export", "1")
+        path = self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        path = os.path.dirname(path) # get subvolume path
+
+        self._get_subtrees(status=status, rank=1)
+        self._wait_subtrees([(path, 1)], status=status)
+
+    def test_subvolumegroup_pin_distributed(self):
+        self.fs.set_max_mds(2)
+        status = self.fs.wait_for_daemons()
+        self.config_set('mds', 'mds_export_ephemeral_distributed', True)
+
+        group = "pinme"
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+        self._fs_cmd("subvolumegroup", "pin", self.volname, group, "distributed", "True")
+        # (no effect on distribution) pin the group directory to 0 so rank 0 has all subtree bounds visible
+        self._fs_cmd("subvolumegroup", "pin", self.volname, group, "export", "0")
+        subvolumes = self._generate_random_subvolume_name(10)
+        for subvolume in subvolumes:
+            self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+        self._wait_distributed_subtrees(10, status=status)
+
+    def test_subvolume_pin_random(self):
+        self.fs.set_max_mds(2)
+        self.fs.wait_for_daemons()
+        self.config_set('mds', 'mds_export_ephemeral_random', True)
+
+        subvolume = self._generate_random_subvolume_name()
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        self._fs_cmd("subvolume", "pin", self.volname, subvolume, "random", ".01")
+        # no verification
+
     def test_subvolume_create_isolated_namespace(self):
         """
         Create subvolume in separate rados namespace
@@ -792,7 +893,7 @@ class TestVolumes(CephFSTestCase):
 
         subvol_md = ["atime", "bytes_pcent", "bytes_quota", "bytes_used", "created_at", "ctime",
                      "data_pool", "gid", "mode", "mon_addrs", "mtime", "path", "pool_namespace",
-                     "type", "uid"]
+                     "type", "uid", "features"]
 
         # create subvolume
         subvolume = self._generate_random_subvolume_name()
@@ -800,37 +901,34 @@ class TestVolumes(CephFSTestCase):
 
         # get subvolume metadata
         subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
-        if len(subvol_info) == 0:
-            raise RuntimeError("Expected the 'fs subvolume info' command to list metadata of subvolume")
+        self.assertNotEqual(len(subvol_info), 0, "expected the 'fs subvolume info' command to list metadata of subvolume")
         for md in subvol_md:
-            if md not in subvol_info.keys():
-                raise RuntimeError("%s not present in the metadata of subvolume" % md)
+            self.assertIn(md, subvol_info.keys(), "'{0}' key not present in metadata of subvolume".format(md))
 
-        if subvol_info["bytes_pcent"] != "undefined":
-            raise RuntimeError("bytes_pcent should be set to undefined if quota is not set")
+        self.assertEqual(subvol_info["bytes_pcent"], "undefined", "bytes_pcent should be set to undefined if quota is not set")
+        self.assertEqual(subvol_info["bytes_quota"], "infinite", "bytes_quota should be set to infinite if quota is not set")
+        self.assertEqual(subvol_info["pool_namespace"], "", "expected pool namespace to be empty")
 
-        if subvol_info["bytes_quota"] != "infinite":
-            raise RuntimeError("bytes_quota should be set to infinite if quota is not set")
-        self.assertEqual(subvol_info["pool_namespace"], "")
+        self.assertEqual(len(subvol_info["features"]), 2,
+                         msg="expected 2 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
+        for feature in ['snapshot-clone', 'snapshot-autoprotect']:
+            self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
         nsize = self.DEFAULT_FILE_SIZE*1024*1024
-        try:
-            self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
-        except CommandFailedError:
-            raise RuntimeError("expected the 'fs subvolume resize' command to succeed")
+        self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
 
         # get subvolume metadata after quota set
         subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
-        if len(subvol_info) == 0:
-            raise RuntimeError("Expected the 'fs subvolume info' command to list metadata of subvolume")
-        if subvol_info["bytes_pcent"] == "undefined":
-            raise RuntimeError("bytes_pcent should not be set to undefined if quota is set")
+        self.assertNotEqual(len(subvol_info), 0, "expected the 'fs subvolume info' command to list metadata of subvolume")
 
-        if subvol_info["bytes_quota"] == "infinite":
-            raise RuntimeError("bytes_quota should not be set to infinite if quota is set")
+        self.assertNotEqual(subvol_info["bytes_pcent"], "undefined", "bytes_pcent should not be set to undefined if quota is not set")
+        self.assertNotEqual(subvol_info["bytes_quota"], "infinite", "bytes_quota should not be set to infinite if quota is not set")
+        self.assertEqual(subvol_info["type"], "subvolume", "type should be set to subvolume")
 
-        if subvol_info["type"] != "subvolume":
-            raise RuntimeError("type should be set to subvolume")
+        self.assertEqual(len(subvol_info["features"]), 2,
+                         msg="expected 2 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
+        for feature in ['snapshot-clone', 'snapshot-autoprotect']:
+            self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@@ -858,18 +956,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -912,8 +1004,7 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
     def test_subvolume_group_create_with_desired_data_pool_layout(self):
-        group1 = self._generate_random_group_name()
-        group2 = self._generate_random_group_name()
+        group1, group2 = self._generate_random_group_name(2)
 
         # create group
         self._fs_cmd("subvolumegroup", "create", self.volname, group1)
@@ -974,8 +1065,7 @@ class TestVolumes(CephFSTestCase):
             raise RuntimeError("expected the 'fs subvolumegroup getpath' command to fail")
 
     def test_subvolume_create_with_desired_data_pool_layout_in_group(self):
-        subvol1 = self._generate_random_subvolume_name()
-        subvol2 = self._generate_random_subvolume_name()
+        subvol1, subvol2 = self._generate_random_subvolume_name(2)
         group = self._generate_random_group_name()
 
         # create group. this also helps set default pool layout for subvolumes
@@ -1006,8 +1096,7 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
     def test_subvolume_group_create_with_desired_mode(self):
-        group1 = self._generate_random_group_name()
-        group2 = self._generate_random_group_name()
+        group1, group2 = self._generate_random_group_name(2)
         # default mode
         expected_mode1 = "755"
         # desired mode
@@ -1055,9 +1144,8 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolumegroup", "rm", self.volname, subvolgroupname)
 
     def test_subvolume_create_with_desired_mode_in_group(self):
-        subvol1 = self._generate_random_subvolume_name()
-        subvol2 = self._generate_random_subvolume_name()
-        subvol3 = self._generate_random_subvolume_name()
+        subvol1, subvol2, subvol3 = self._generate_random_subvolume_name(3)
+
         group = self._generate_random_group_name()
         # default mode
         expected_mode1 = "755"
@@ -1198,7 +1286,7 @@ class TestVolumes(CephFSTestCase):
         tests the 'fs subvolume snapshot info' command
         """
 
-        snap_metadata = ["created_at", "data_pool", "has_pending_clones", "protected", "size"]
+        snap_metadata = ["created_at", "data_pool", "has_pending_clones", "size"]
 
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
@@ -1212,20 +1300,13 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
         self.assertNotEqual(len(snap_info), 0)
         for md in snap_metadata:
             if md not in snap_info:
                 raise RuntimeError("%s not present in the metadata of subvolume snapshot" % md)
-        self.assertEqual(snap_info["protected"], "yes")
         self.assertEqual(snap_info["has_pending_clones"], "no")
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1571,21 +1652,20 @@ class TestVolumes(CephFSTestCase):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_protect_unprotect(self):
+    def test_subvolume_snapshot_protect_unprotect_sanity(self):
+        """
+        Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
+        invoking the command does not cause errors, till they are removed from a subsequent release.
+        """
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
 
         # create subvolume
         self._fs_cmd("subvolume", "create", self.volname, subvolume)
 
-        # protect a nonexistent snapshot
-        try:
-            self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.ENOENT:
-                raise RuntimeError("invalid error code when protecting a non-existing snapshot")
-        else:
-            raise RuntimeError("expected protection of non existent snapshot to fail")
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
 
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
@@ -1593,23 +1673,11 @@ class TestVolumes(CephFSTestCase):
         # now, protect snapshot
         self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
 
-        # protecting snapshot again, should return EEXIST
-        try:
-            self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.EEXIST:
-                raise RuntimeError("invalid error code when protecting a protected snapshot")
-        else:
-            raise RuntimeError("expected protection of already protected snapshot to fail")
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
-        # remove snapshot should fail since the snapshot is protected
-        try:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise RuntimeError("invalid error code when removing a protected snapshot")
-        else:
-            raise RuntimeError("expected removal of protected snapshot to fail")
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
 
         # now, unprotect snapshot
         self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
@@ -1617,37 +1685,12 @@ class TestVolumes(CephFSTestCase):
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-
-        # verify trash dir is clean
-        self._wait_for_trash_empty()
-
-    def test_subvolume_snapshot_clone_unprotected_snapshot(self):
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
-        clone = self._generate_random_clone_name()
-
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
-
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
-
-        # clone a non protected snapshot
-        try:
-            self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise RuntimeError("invalid error code when cloning a non protected snapshot")
-        else:
-            raise RuntimeError("expected cloning of unprotected snapshot to fail")
-
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # verify clone
+        self._verify_clone(subvolume, clone)
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
@@ -1666,27 +1709,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
-        # unprotecting when a clone is in progress should fail
-        try:
-            self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.EEXIST:
-                raise RuntimeError("invalid error code when unprotecting snapshot during clone")
-        else:
-            raise RuntimeError("expected unprotecting a snapshot to fail since it has pending clones")
-
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1718,18 +1746,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone, "--pool_layout", new_pool)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1765,18 +1787,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1804,18 +1820,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
 
         # check clone status
         self._wait_for_clone_to_complete(clone1)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1829,18 +1839,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot clone -- use same snap name
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone1, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, clone1, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, clone1, snapshot, clone2)
 
         # check clone status
         self._wait_for_clone_to_complete(clone2)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, clone1, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone1, snapshot)
 
@@ -1870,9 +1874,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # create group
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
@@ -1882,9 +1883,6 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=group)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1919,18 +1917,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot, group)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone, '--group_name', group)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot, group)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
 
@@ -1966,9 +1958,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, s_group)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot, s_group)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone,
                      '--group_name', s_group, '--target_group_name', c_group)
@@ -1976,9 +1965,6 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=c_group)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot, s_group)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, s_group)
 
@@ -2011,23 +1997,25 @@ class TestVolumes(CephFSTestCase):
         self.mount_a.run_shell(['mkdir', '-p', createpath])
 
         # do some IO
-        self._do_subvolume_io(subvolume, number_of_files=32)
+        self._do_subvolume_io(subvolume, number_of_files=64)
 
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
+        # snapshot should not be deletable now
+        try:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, msg="invalid error code when removing source snapshot of a clone")
+        else:
+            self.fail("expected removing source snapshot of a clone to fail")
+
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -2055,9 +2043,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -2066,7 +2051,7 @@ class TestVolumes(CephFSTestCase):
             self._get_subvolume_path(self.volname, clone)
         except CommandFailedError as ce:
             if ce.exitstatus != errno.EAGAIN:
-                raise RuntimeError("invalid error code when cloning a non protected snapshot")
+                raise RuntimeError("invalid error code when fetching path of an pending clone")
         else:
             raise RuntimeError("expected fetching path of an pending clone to fail")
 
@@ -2077,8 +2062,50 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, clone)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_in_progress_snapshot_rm(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # snapshot should not be deletable now
+        try:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, msg="invalid error code when removing source snapshot of a clone")
+        else:
+            self.fail("expected removing source snapshot of a clone to fail")
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # clone should be accessible now
+        subvolpath = self._get_subvolume_path(self.volname, clone)
+        self.assertNotEqual(subvolpath, None)
 
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
@@ -2107,9 +2134,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -2128,9 +2152,6 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -2179,9 +2200,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume1, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume1, snapshot)
-
         # schedule a clone with target as subvolume2
         try:
             self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume1, snapshot, subvolume2)
@@ -2205,9 +2223,6 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume1, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, snapshot)
 
@@ -2240,9 +2255,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # add data pool
         new_pool = "new_pool"
         self.fs.add_data_pool(new_pool)
@@ -2268,9 +2280,6 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_fail(clone2)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -2305,18 +2314,12 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -2344,9 +2347,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -2356,9 +2356,6 @@ class TestVolumes(CephFSTestCase):
         # verify canceled state
         self._check_clone_canceled(clone)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -2398,9 +2395,6 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
         # schedule clones
         for clone in clones:
             self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
@@ -2426,9 +2420,6 @@ class TestVolumes(CephFSTestCase):
                 if ce.exitstatus != errno.EINVAL:
                     raise RuntimeError("invalid error code when cancelling on-going clone")
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
-
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)