]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_volumes.py
import ceph quincy 17.2.1
[ceph.git] / ceph / qa / tasks / cephfs / test_volumes.py
index e9d57a770bb73e3a70f074c3d0275c21f5b4808f..2ae4674dbca71343bc8ac7de448c3961e4626277 100644 (file)
@@ -9,6 +9,7 @@ import uuid
 import unittest
 from hashlib import md5
 from textwrap import dedent
+from io import StringIO
 
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
 from tasks.cephfs.fuse_mount import FuseMount
@@ -53,12 +54,23 @@ class TestVolumesHelper(CephFSTestCase):
             time.sleep(1)
         self.assertTrue(check < timo)
 
+    def _get_clone_status(self, clone, clone_group=None):
+        args = ["clone", "status", self.volname, clone]
+        if clone_group:
+            args.append(clone_group)
+        args = tuple(args)
+        result = json.loads(self._fs_cmd(*args))
+        return result
+
     def _wait_for_clone_to_complete(self, clone, clone_group=None, timo=120):
         self.__check_clone_state("complete", clone, clone_group, timo)
 
     def _wait_for_clone_to_fail(self, clone, clone_group=None, timo=120):
         self.__check_clone_state("failed", clone, clone_group, timo)
 
+    def _wait_for_clone_to_be_in_progress(self, clone, clone_group=None, timo=120):
+        self.__check_clone_state("in-progress", clone, clone_group, timo)
+
     def _check_clone_canceled(self, clone, clone_group=None):
         self.__check_clone_state("canceled", clone, clone_group, timo=1)
 
@@ -2437,428 +2449,535 @@ class TestSubvolumes(TestVolumesHelper):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-
-class TestSubvolumeGroupSnapshots(TestVolumesHelper):
-    """Tests for FS subvolume group snapshot operations."""
-    @unittest.skip("skipping subvolumegroup snapshot tests")
-    def test_nonexistent_subvolume_group_snapshot_rm(self):
+    def test_subvolume_retain_snapshot_rm_idempotency(self):
+        """
+        ensure subvolume deletion of a subvolume which is already deleted with retain snapshots option passes.
+        After subvolume deletion with retain snapshots, the subvolume exists until the trash directory (resides inside subvolume)
+        is cleaned up. The subvolume deletion issued while the trash directory is not empty, should pass and should
+        not error out with EAGAIN.
+        """
         subvolume = self._generate_random_subvolume_name()
-        group = self._generate_random_group_name()
         snapshot = self._generate_random_snapshot_name()
 
-        # create group
-        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=256)
 
-        # snapshot group
-        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # remove snapshot
-        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
 
-        # remove snapshot
+        # remove snapshots (removes retained volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume (check idempotency)
         try:
-            self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         except CommandFailedError as ce:
             if ce.exitstatus != errno.ENOENT:
-                raise
-        else:
-            raise RuntimeError("expected the 'fs subvolumegroup snapshot rm' command to fail")
-
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+                self.fail(f"expected subvolume rm to pass with error: {os.strerror(ce.exitstatus)}")
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-        # remove group
-        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    @unittest.skip("skipping subvolumegroup snapshot tests")
-    def test_subvolume_group_snapshot_create_and_rm(self):
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_user_metadata_set(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
-        snapshot = self._generate_random_snapshot_name()
 
-        # create group
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
-
-        # snapshot group
-        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # remove snapshot
-        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        try:
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata set' command to succeed")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-        # remove group
-        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
-
-    @unittest.skip("skipping subvolumegroup snapshot tests")
-    def test_subvolume_group_snapshot_idempotence(self):
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_user_metadata_set_idempotence(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
-        snapshot = self._generate_random_snapshot_name()
 
-        # create group
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
-
-        # snapshot group
-        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
-
-        # try creating snapshot w/ same snapshot name -- shoule be idempotent
-        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
-
-        # remove snapshot
-        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        try:
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata set' command to succeed")
 
-        # verify trash dir is clean
-        self._wait_for_trash_empty()
+        # set same metadata again for subvolume.
+        try:
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata set' command to succeed because it is idempotent operation")
 
-        # remove group
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    @unittest.skip("skipping subvolumegroup snapshot tests")
-    def test_subvolume_group_snapshot_ls(self):
-        # tests the 'fs subvolumegroup snapshot ls' command
-
-        snapshots = []
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
-        # create group
+    def test_subvolume_user_metadata_get(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
+
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolumegroup snapshots
-        snapshots = self._generate_random_snapshot_name(3)
-        for snapshot in snapshots:
-            self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        subvolgrpsnapshotls = json.loads(self._fs_cmd('subvolumegroup', 'snapshot', 'ls', self.volname, group))
-        if len(subvolgrpsnapshotls) == 0:
-            raise RuntimeError("Expected the 'fs subvolumegroup snapshot ls' command to list the created subvolume group snapshots")
-        else:
-            snapshotnames = [snapshot['name'] for snapshot in subvolgrpsnapshotls]
-            if collections.Counter(snapshotnames) != collections.Counter(snapshots):
-                raise RuntimeError("Error creating or listing subvolume group snapshots")
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-    @unittest.skip("skipping subvolumegroup snapshot tests")
-    def test_subvolume_group_snapshot_rm_force(self):
-        # test removing non-existing subvolume group snapshot with --force
-        group = self._generate_random_group_name()
-        snapshot = self._generate_random_snapshot_name()
-        # remove snapshot
+        # get value for specified key.
         try:
-            self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot, "--force")
+            ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
         except CommandFailedError:
-            raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
-
-    def test_subvolume_group_snapshot_unsupported_status(self):
-        group = self._generate_random_group_name()
-        snapshot = self._generate_random_snapshot_name()
+            self.fail("expected the 'fs subvolume metadata get' command to succeed")
 
-        # create group
-        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
 
-        # snapshot group
-        try:
-            self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
-        else:
-            self.fail("expected subvolumegroup snapshot create command to fail")
+        # match received value with expected value.
+        self.assertEqual(value, ret)
 
-        # remove group
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
-class TestSubvolumeSnapshots(TestVolumesHelper):
-    """Tests for FS subvolume snapshot operations."""
-    def test_nonexistent_subvolume_snapshot_rm(self):
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+    def test_subvolume_user_metadata_get_for_nonexisting_key(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-        # remove snapshot again
+        # try to get value for nonexisting key
+        # Expecting ENOENT exit status because key does not exist
         try:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.ENOENT:
-                raise
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key_nonexist", "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
         else:
-            raise RuntimeError("expected the 'fs subvolume snapshot rm' command to fail")
+            self.fail("Expected ENOENT because 'key_nonexist' does not exist")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_create_and_rm(self):
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+    def test_subvolume_user_metadata_get_for_nonexisting_section(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # try to get value for nonexisting key (as section does not exist)
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key", "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because section does not exist")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_create_idempotence(self):
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+    def test_subvolume_user_metadata_update(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # try creating w/ same subvolume snapshot name -- should be idempotent
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # update metadata against key.
+        new_value = "new_value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, new_value, "--group_name", group)
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        # get metadata for specified key of subvolume.
+        try:
+            ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata get' command to succeed")
 
-        # verify trash dir is clean
-        self._wait_for_trash_empty()
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
 
-    def test_subvolume_snapshot_info(self):
+        # match received value with expected value.
+        self.assertEqual(new_value, ret)
 
-        """
-        tests the 'fs subvolume snapshot info' command
-        """
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
-        subvolume = self._generate_random_subvolume_name()
-        snapshot, snap_missing = self._generate_random_snapshot_name(2)
+    def test_subvolume_user_metadata_list(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # do some IO
-        self._do_subvolume_io(subvolume, number_of_files=1)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # set metadata for subvolume.
+        input_metadata_dict =  {f'key_{i}' : f'value_{i}' for i in range(3)}
 
-        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
-        for md in snap_md:
-            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
-        self.assertEqual(snap_info["has_pending_clones"], "no")
+        for k, v in input_metadata_dict.items():
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, k, v, "--group_name", group)
 
-        # snapshot info for non-existent snapshot
+        # list metadata
         try:
-            self._get_subvolume_snapshot_info(self.volname, subvolume, snap_missing)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot info of non-existent snapshot")
-        else:
-            self.fail("expected snapshot info of non-existent snapshot to fail")
+            ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata ls' command to succeed")
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        ret_dict = json.loads(ret)
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        # compare output with expected output
+        self.assertDictEqual(input_metadata_dict, ret_dict)
 
-        # verify trash dir is clean
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_in_group(self):
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_user_metadata_list_if_no_metadata_set(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
-        snapshot = self._generate_random_snapshot_name()
 
-        # create group
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # snapshot subvolume in group
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+        # list metadata
+        try:
+            ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata ls' command to succeed")
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+        # compare output with expected output
+        # expecting empty json/dictionary
+        self.assertEqual(ret, "{}")
 
-        # verify trash dir is clean
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-        # remove group
-        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+    def test_subvolume_user_metadata_remove(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-    def test_subvolume_snapshot_ls(self):
-        # tests the 'fs subvolume snapshot ls' command
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        snapshots = []
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # create subvolume
-        subvolume = self._generate_random_subvolume_name()
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-        # create subvolume snapshots
-        snapshots = self._generate_random_snapshot_name(3)
-        for snapshot in snapshots:
-            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # remove metadata against specified key.
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata rm' command to succeed")
 
-        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
-        if len(subvolsnapshotls) == 0:
-            self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
+        # confirm key is removed by again fetching metadata
+        try:
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
         else:
-            snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
-            if collections.Counter(snapshotnames) != collections.Counter(snapshots):
-                self.fail("Error creating or listing subvolume snapshots")
-
-        # remove snapshot
-        for snapshot in snapshots:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+            self.fail("Expected ENOENT because key does not exist")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_inherited_snapshot_ls(self):
-        # tests the scenario where 'fs subvolume snapshot ls' command
-        # should not list inherited snapshots created as part of snapshot
-        # at ancestral level
-
-        snapshots = []
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_user_metadata_remove_for_nonexisting_key(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
-        snap_count = 3
 
-        # create group
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # create subvolume snapshots
-        snapshots = self._generate_random_snapshot_name(snap_count)
-        for snapshot in snapshots:
-            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-        # Create snapshot at ancestral level
-        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_1")
-        ancestral_snappath2 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_2")
-        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1, ancestral_snappath2], sudo=True)
+        # try to remove value for nonexisting key
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key_nonexist", "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because 'key_nonexist' does not exist")
 
-        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume, group))
-        self.assertEqual(len(subvolsnapshotls), snap_count)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # remove ancestral snapshots
-        self.mount_a.run_shell(['rmdir', ancestral_snappath1, ancestral_snappath2], sudo=True)
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
-        # remove snapshot
-        for snapshot in snapshots:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+    def test_subvolume_user_metadata_remove_for_nonexisting_section(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # verify trash dir is clean
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
+
+        # try to remove value for nonexisting key (as section does not exist)
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key", "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because section does not exist")
+
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-        # remove group
+    def test_subvolume_user_metadata_remove_force(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
+
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+
+        # remove metadata against specified key with --force option.
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group, "--force")
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata rm' command to succeed")
+
+        # confirm key is removed by again fetching metadata
+        try:
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because key does not exist")
+
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    def test_subvolume_inherited_snapshot_info(self):
-        """
-        tests the scenario where 'fs subvolume snapshot info' command
-        should fail for inherited snapshots created as part of snapshot
-        at ancestral level
-        """
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_user_metadata_remove_force_for_nonexisting_key(self):
+        subvolname = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
 
-        # create group
+        # create group.
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # create subvolume in group
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
 
-        # Create snapshot at ancestral level
-        ancestral_snap_name = "ancestral_snap_1"
-        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
-        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
 
-        # Validate existence of inherited snapshot
-        group_path = os.path.join(".", "volumes", group)
-        inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
-        inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
-        inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
-        self.mount_a.run_shell(['ls', inherited_snappath])
+        # remove metadata against specified key.
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata rm' command to succeed")
 
-        # snapshot info on inherited snapshot
+        # confirm key is removed by again fetching metadata
         try:
-            self._get_subvolume_snapshot_info(self.volname, subvolume, inherited_snap, group)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on snapshot info of inherited snapshot")
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
         else:
-            self.fail("expected snapshot info of inherited snapshot to fail")
+            self.fail("Expected ENOENT because key does not exist")
 
-        # remove ancestral snapshots
-        self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+        # again remove metadata against already removed key with --force option.
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group, "--force")
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata rm' (with --force) command to succeed")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-        # remove group
+    def test_subvolume_user_metadata_set_and_get_for_legacy_subvolume(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+
+        # emulate a old-fashioned subvolume in a custom group
+        createpath = os.path.join(".", "volumes", group, subvolname)
+        self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+        # set metadata for subvolume.
+        key = "key"
+        value = "value"
+        try:
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata set' command to succeed")
+
+        # get value for specified key.
+        try:
+            ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata get' command to succeed")
+
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
+
+        # match received value with expected value.
+        self.assertEqual(value, ret)
+
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    def test_subvolume_inherited_snapshot_rm(self):
-        """
-        tests the scenario where 'fs subvolume snapshot rm' command
-        should fail for inherited snapshots created as part of snapshot
-        at ancestral level
-        """
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_user_metadata_list_and_remove_for_legacy_subvolume(self):
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+
+        # emulate a old-fashioned subvolume in a custom group
+        createpath = os.path.join(".", "volumes", group, subvolname)
+        self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+        # set metadata for subvolume.
+        input_metadata_dict =  {f'key_{i}' : f'value_{i}' for i in range(3)}
+
+        for k, v in input_metadata_dict.items():
+            self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, k, v, "--group_name", group)
+
+        # list metadata
+        try:
+            ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata ls' command to succeed")
+
+        ret_dict = json.loads(ret)
+
+        # compare output with expected output
+        self.assertDictEqual(input_metadata_dict, ret_dict)
+
+        # remove metadata against specified key.
+        try:
+            self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key_1", "--group_name", group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume metadata rm' command to succeed")
+
+        # confirm key is removed by again fetching metadata
+        try:
+            self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key_1", "--group_name", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because key_1 does not exist")
+
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+class TestSubvolumeGroupSnapshots(TestVolumesHelper):
+    """Tests for FS subvolume group snapshot operations."""
+    @unittest.skip("skipping subvolumegroup snapshot tests")
+    def test_nonexistent_subvolume_group_snapshot_rm(self):
         subvolume = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
 
         # create group
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
@@ -2866,28 +2985,20 @@ class TestSubvolumeSnapshots(TestVolumesHelper):
         # create subvolume in group
         self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
 
-        # Create snapshot at ancestral level
-        ancestral_snap_name = "ancestral_snap_1"
-        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
-        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+        # snapshot group
+        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
 
-        # Validate existence of inherited snap
-        group_path = os.path.join(".", "volumes", group)
-        inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
-        inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
-        inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
-        self.mount_a.run_shell(['ls', inherited_snappath])
+        # remove snapshot
+        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
 
-        # inherited snapshot should not be deletable
+        # remove snapshot
         try:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, inherited_snap, "--group_name", group)
+            self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
         except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when removing inherited snapshot")
+            if ce.exitstatus != errno.ENOENT:
+                raise
         else:
-            self.fail("expected removing inheirted snapshot to fail")
-
-        # remove ancestral snapshots
-        self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+            raise RuntimeError("expected the 'fs subvolumegroup snapshot rm' command to fail")
 
         # remove subvolume
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
@@ -2898,16 +3009,11 @@ class TestSubvolumeSnapshots(TestVolumesHelper):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    def test_subvolume_subvolumegroup_snapshot_name_conflict(self):
-        """
-        tests the scenario where creation of subvolume snapshot name
-        with same name as it's subvolumegroup snapshot name. This should
-        fail.
-        """
-
+    @unittest.skip("skipping subvolumegroup snapshot tests")
+    def test_subvolume_group_snapshot_create_and_rm(self):
         subvolume = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
-        group_snapshot = self._generate_random_snapshot_name()
+        snapshot = self._generate_random_snapshot_name()
 
         # create group
         self._fs_cmd("subvolumegroup", "create", self.volname, group)
@@ -2915,23 +3021,11 @@ class TestSubvolumeSnapshots(TestVolumesHelper):
         # create subvolume in group
         self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
 
-        # Create subvolumegroup snapshot
-        group_snapshot_path = os.path.join(".", "volumes", group, ".snap", group_snapshot)
-        self.mount_a.run_shell(['mkdir', '-p', group_snapshot_path], sudo=True)
-
-        # Validate existence of subvolumegroup snapshot
-        self.mount_a.run_shell(['ls', group_snapshot_path])
-
-        # Creation of subvolume snapshot with it's subvolumegroup snapshot name should fail
-        try:
-            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, group_snapshot, "--group_name", group)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when creating subvolume snapshot with same name as subvolume group snapshot")
-        else:
-            self.fail("expected subvolume snapshot creation with same name as subvolumegroup snapshot to fail")
+        # snapshot group
+        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
 
-        # remove subvolumegroup snapshot
-        self.mount_a.run_shell(['rmdir', group_snapshot_path], sudo=True)
+        # remove snapshot
+        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
 
         # remove subvolume
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
@@ -2942,352 +3036,1368 @@ class TestSubvolumeSnapshots(TestVolumesHelper):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    def test_subvolume_retain_snapshot_invalid_recreate(self):
-        """
-        ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
-        """
+    @unittest.skip("skipping subvolumegroup snapshot tests")
+    def test_subvolume_group_snapshot_idempotence(self):
         subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
         snapshot = self._generate_random_snapshot_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
 
-        # remove with snapshot retention
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+        # snapshot group
+        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
 
-        # recreate subvolume with an invalid pool
-        data_pool = "invalid_pool"
+        # try creating snapshot w/ same snapshot name -- shoule be idempotent
+        self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    @unittest.skip("skipping subvolumegroup snapshot tests")
+    def test_subvolume_group_snapshot_ls(self):
+        # tests the 'fs subvolumegroup snapshot ls' command
+
+        snapshots = []
+
+        # create group
+        group = self._generate_random_group_name()
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolumegroup snapshots
+        snapshots = self._generate_random_snapshot_name(3)
+        for snapshot in snapshots:
+            self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+
+        subvolgrpsnapshotls = json.loads(self._fs_cmd('subvolumegroup', 'snapshot', 'ls', self.volname, group))
+        if len(subvolgrpsnapshotls) == 0:
+            raise RuntimeError("Expected the 'fs subvolumegroup snapshot ls' command to list the created subvolume group snapshots")
+        else:
+            snapshotnames = [snapshot['name'] for snapshot in subvolgrpsnapshotls]
+            if collections.Counter(snapshotnames) != collections.Counter(snapshots):
+                raise RuntimeError("Error creating or listing subvolume group snapshots")
+
+    @unittest.skip("skipping subvolumegroup snapshot tests")
+    def test_subvolume_group_snapshot_rm_force(self):
+        # test removing non-existing subvolume group snapshot with --force
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+        # remove snapshot
         try:
-            self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+            self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot, "--force")
+        except CommandFailedError:
+            raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
+
+    def test_subvolume_group_snapshot_unsupported_status(self):
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # snapshot group
+        try:
+            self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
         except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+            self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
         else:
-            self.fail("expected recreate of subvolume with invalid poolname to fail")
+            self.fail("expected subvolumegroup snapshot create command to fail")
 
-        # fetch info
-        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
-        self.assertEqual(subvol_info["state"], "snapshot-retained",
-                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # getpath
+
+class TestSubvolumeSnapshots(TestVolumesHelper):
+    """Tests for FS subvolume snapshot operations."""
+    def test_nonexistent_subvolume_snapshot_rm(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove snapshot again
         try:
-            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
         except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+            if ce.exitstatus != errno.ENOENT:
+                raise
         else:
-            self.fail("expected getpath of subvolume with retained snapshots to fail")
+            raise RuntimeError("expected the 'fs subvolume snapshot rm' command to fail")
 
-        # remove snapshot (should remove volume)
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-    def test_subvolume_retain_snapshot_recreate_subvolume(self):
-        """
-        ensure a retained subvolume can be recreated and further snapshotted
-        """
-        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
-
+    def test_subvolume_snapshot_create_and_rm(self):
         subvolume = self._generate_random_subvolume_name()
-        snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+        snapshot = self._generate_random_snapshot_name()
 
         # create subvolume
         self._fs_cmd("subvolume", "create", self.volname, subvolume)
 
         # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
-
-        # remove with snapshot retention
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # fetch info
-        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
-        self.assertEqual(subvol_info["state"], "snapshot-retained",
-                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # recreate retained subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
 
-        # fetch info
-        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
-        self.assertEqual(subvol_info["state"], "complete",
-                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
 
-        # snapshot info (older snapshot)
-        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
-        for md in snap_md:
-            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
-        self.assertEqual(snap_info["has_pending_clones"], "no")
+    def test_subvolume_snapshot_create_idempotence(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
 
-        # snap-create (new snapshot)
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
 
-        # remove with retain snapshots
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # list snapshots
-        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
-        self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
-                         " created subvolume snapshots")
-        snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
-        for snap in [snapshot1, snapshot2]:
-            self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+        # try creating w/ same subvolume snapshot name -- should be idempotent
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # remove snapshots (should remove volume)
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify list subvolumes returns an empty list
-        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
-        self.assertEqual(len(subvolumels), 0)
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-    def test_subvolume_retain_snapshot_with_snapshots(self):
+    def test_subvolume_snapshot_info(self):
+
         """
-        ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
-        also test allowed and dis-allowed operations on a retained subvolume
+        tests the 'fs subvolume snapshot info' command
         """
+
         snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
 
         subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+        snapshot, snap_missing = self._generate_random_snapshot_name(2)
 
         # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=1)
 
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
-        try:
-            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
-        else:
-            self.fail("expected rm of subvolume with retained snapshots to fail")
-
-        # remove with snapshot retention
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
-
-        # fetch info
-        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
-        self.assertEqual(subvol_info["state"], "snapshot-retained",
-                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
-
-        ## test allowed ops in retained state
-        # ls
-        subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
-        self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
-        self.assertEqual(subvolumes[0]['name'], subvolume,
-                         "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
-
-        # snapshot info
         snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
         for md in snap_md:
             self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
         self.assertEqual(snap_info["has_pending_clones"], "no")
 
-        # rm --force (allowed but should fail)
-        try:
-            self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
-        else:
-            self.fail("expected rm of subvolume with retained snapshots to fail")
-
-        # rm (allowed but should fail)
-        try:
-            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
-        else:
-            self.fail("expected rm of subvolume with retained snapshots to fail")
-
-        ## test disallowed ops
-        # getpath
-        try:
-            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
-        else:
-            self.fail("expected getpath of subvolume with retained snapshots to fail")
-
-        # resize
-        nsize = self.DEFAULT_FILE_SIZE*1024*1024
-        try:
-            self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
-        else:
-            self.fail("expected resize of subvolume with retained snapshots to fail")
-
-        # snap-create
+        # snapshot info for non-existent snapshot
         try:
-            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+            self._get_subvolume_snapshot_info(self.volname, subvolume, snap_missing)
         except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot info of non-existent snapshot")
         else:
-            self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+            self.fail("expected snapshot info of non-existent snapshot to fail")
 
-        # remove snapshot (should remove volume)
+        # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify list subvolumes returns an empty list
-        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
-        self.assertEqual(len(subvolumels), 0)
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-    def test_subvolume_retain_snapshot_without_snapshots(self):
+    def test_subvolume_snapshot_in_group(self):
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+        # snapshot subvolume in group
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    def test_subvolume_snapshot_ls(self):
+        # tests the 'fs subvolume snapshot ls' command
+
+        snapshots = []
+
+        # create subvolume
+        subvolume = self._generate_random_subvolume_name()
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # create subvolume snapshots
+        snapshots = self._generate_random_snapshot_name(3)
+        for snapshot in snapshots:
+            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+        if len(subvolsnapshotls) == 0:
+            self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
+        else:
+            snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+            if collections.Counter(snapshotnames) != collections.Counter(snapshots):
+                self.fail("Error creating or listing subvolume snapshots")
+
+        # remove snapshot
+        for snapshot in snapshots:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_inherited_snapshot_ls(self):
+        # tests the scenario where 'fs subvolume snapshot ls' command
+        # should not list inherited snapshots created as part of snapshot
+        # at ancestral level
+
+        snapshots = []
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snap_count = 3
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+        # create subvolume snapshots
+        snapshots = self._generate_random_snapshot_name(snap_count)
+        for snapshot in snapshots:
+            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+        # Create snapshot at ancestral level
+        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_1")
+        ancestral_snappath2 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_2")
+        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1, ancestral_snappath2], sudo=True)
+
+        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume, group))
+        self.assertEqual(len(subvolsnapshotls), snap_count)
+
+        # remove ancestral snapshots
+        self.mount_a.run_shell(['rmdir', ancestral_snappath1, ancestral_snappath2], sudo=True)
+
+        # remove snapshot
+        for snapshot in snapshots:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    def test_subvolume_inherited_snapshot_info(self):
+        """
+        tests the scenario where 'fs subvolume snapshot info' command
+        should fail for inherited snapshots created as part of snapshot
+        at ancestral level
+        """
+
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+        # Create snapshot at ancestral level
+        ancestral_snap_name = "ancestral_snap_1"
+        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
+        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+
+        # Validate existence of inherited snapshot
+        group_path = os.path.join(".", "volumes", group)
+        inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
+        inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
+        inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
+        self.mount_a.run_shell(['ls', inherited_snappath])
+
+        # snapshot info on inherited snapshot
+        try:
+            self._get_subvolume_snapshot_info(self.volname, subvolume, inherited_snap, group)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on snapshot info of inherited snapshot")
+        else:
+            self.fail("expected snapshot info of inherited snapshot to fail")
+
+        # remove ancestral snapshots
+        self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    def test_subvolume_inherited_snapshot_rm(self):
+        """
+        tests the scenario where 'fs subvolume snapshot rm' command
+        should fail for inherited snapshots created as part of snapshot
+        at ancestral level
+        """
+
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+        # Create snapshot at ancestral level
+        ancestral_snap_name = "ancestral_snap_1"
+        ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
+        self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+
+        # Validate existence of inherited snap
+        group_path = os.path.join(".", "volumes", group)
+        inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
+        inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
+        inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
+        self.mount_a.run_shell(['ls', inherited_snappath])
+
+        # inherited snapshot should not be deletable
+        try:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, inherited_snap, "--group_name", group)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when removing inherited snapshot")
+        else:
+            self.fail("expected removing inheirted snapshot to fail")
+
+        # remove ancestral snapshots
+        self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    def test_subvolume_subvolumegroup_snapshot_name_conflict(self):
+        """
+        tests the scenario where creation of subvolume snapshot name
+        with same name as it's subvolumegroup snapshot name. This should
+        fail.
+        """
+
+        subvolume = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        group_snapshot = self._generate_random_snapshot_name()
+
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+        # Create subvolumegroup snapshot
+        group_snapshot_path = os.path.join(".", "volumes", group, ".snap", group_snapshot)
+        self.mount_a.run_shell(['mkdir', '-p', group_snapshot_path], sudo=True)
+
+        # Validate existence of subvolumegroup snapshot
+        self.mount_a.run_shell(['ls', group_snapshot_path])
+
+        # Creation of subvolume snapshot with it's subvolumegroup snapshot name should fail
+        try:
+            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, group_snapshot, "--group_name", group)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when creating subvolume snapshot with same name as subvolume group snapshot")
+        else:
+            self.fail("expected subvolume snapshot creation with same name as subvolumegroup snapshot to fail")
+
+        # remove subvolumegroup snapshot
+        self.mount_a.run_shell(['rmdir', group_snapshot_path], sudo=True)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    def test_subvolume_retain_snapshot_invalid_recreate(self):
+        """
+        ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # recreate subvolume with an invalid pool
+        data_pool = "invalid_pool"
+        try:
+            self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+        else:
+            self.fail("expected recreate of subvolume with invalid poolname to fail")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # getpath
+        try:
+            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+        else:
+            self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+        # remove snapshot (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_recreate_subvolume(self):
+        """
+        ensure a retained subvolume can be recreated and further snapshotted
+        """
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+        subvolume = self._generate_random_subvolume_name()
+        snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # recreate retained subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "complete",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # snapshot info (older snapshot)
+        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+        self.assertEqual(snap_info["has_pending_clones"], "no")
+
+        # snap-create (new snapshot)
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+
+        # remove with retain snapshots
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # list snapshots
+        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+        self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
+                         " created subvolume snapshots")
+        snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+        for snap in [snapshot1, snapshot2]:
+            self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+
+        # remove snapshots (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_with_snapshots(self):
+        """
+        ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
+        also test allowed and dis-allowed operations on a retained subvolume
+        """
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        ## test allowed ops in retained state
+        # ls
+        subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
+        self.assertEqual(subvolumes[0]['name'], subvolume,
+                         "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
+
+        # snapshot info
+        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+        self.assertEqual(snap_info["has_pending_clones"], "no")
+
+        # rm --force (allowed but should fail)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        # rm (allowed but should fail)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        ## test disallowed ops
+        # getpath
+        try:
+            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+        else:
+            self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+        # resize
+        nsize = self.DEFAULT_FILE_SIZE*1024*1024
+        try:
+            self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
+        else:
+            self.fail("expected resize of subvolume with retained snapshots to fail")
+
+        # snap-create
+        try:
+            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+        else:
+            self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+
+        # remove snapshot (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_without_snapshots(self):
+        """
+        ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+        """
+        subvolume = self._generate_random_subvolume_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # remove with snapshot retention (should remove volume, no snapshots to retain)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+        """
+        ensure retained subvolume recreate fails if its trash is not yet purged
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fake a trash entry
+        self._update_fake_trash(subvolume)
+
+        # recreate subvolume
+        try:
+            self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+        else:
+            self.fail("expected recreate of subvolume with purge pending to fail")
+
+        # clear fake trash entry
+        self._update_fake_trash(subvolume, create=False)
+
+        # recreate subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_rm_with_snapshots(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOTEMPTY:
+                raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+        else:
+            raise RuntimeError("expected subvolume deletion to fail")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_protect_unprotect_sanity(self):
+        """
+        Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
+        invoking the command does not cause errors, till they are removed from a subsequent release.
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_rm_force(self):
+        # test removing non existing subvolume snapshot with --force
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # remove snapshot
+        try:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, "--force")
+        except CommandFailedError:
+            raise RuntimeError("expected the 'fs subvolume snapshot rm --force' command to succeed")
+
+    def test_subvolume_snapshot_metadata_set(self):
+        """
+        Set custom metadata for subvolume snapshot.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_set_idempotence(self):
+        """
+        Set custom metadata for subvolume snapshot (Idempotency).
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed")
+
+        # set same metadata again for subvolume.
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed because it is idempotent operation")
+
+        # get value for specified key.
+        try:
+            ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
+
+        # match received value with expected value.
+        self.assertEqual(value, ret)
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_get(self):
+        """
+        Get custom metadata for a specified key in subvolume snapshot metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # get value for specified key.
+        try:
+            ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
+
+        # match received value with expected value.
+        self.assertEqual(value, ret)
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_get_for_nonexisting_key(self):
+        """
+        Get custom metadata for subvolume snapshot if specified key not exist in metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # try to get value for nonexisting key
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, "key_nonexist", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because 'key_nonexist' does not exist")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_get_for_nonexisting_section(self):
+        """
+        Get custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # try to get value for nonexisting key (as section does not exist)
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, "key", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because section does not exist")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_update(self):
+        """
+        Update custom metadata for a specified key in subvolume snapshot metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # update metadata against key.
+        new_value = "new_value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, new_value, group)
+
+        # get metadata for specified key of snapshot.
+        try:
+            ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
+
+        # match received value with expected value.
+        self.assertEqual(new_value, ret)
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_list(self):
+        """
+        List custom metadata for subvolume snapshot.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for subvolume.
+        input_metadata_dict =  {f'key_{i}' : f'value_{i}' for i in range(3)}
+
+        for k, v in input_metadata_dict.items():
+            self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, k, v, group)
+
+        # list metadata
+        try:
+            ret_dict = json.loads(self._fs_cmd("subvolume", "snapshot", "metadata", "ls", self.volname, subvolname, snapshot, group))
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata ls' command to succeed")
+
+        # compare output with expected output
+        self.assertDictEqual(input_metadata_dict, ret_dict)
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_list_if_no_metadata_set(self):
+        """
+        List custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # list metadata
+        try:
+            ret_dict = json.loads(self._fs_cmd("subvolume", "snapshot", "metadata", "ls", self.volname, subvolname, snapshot, group))
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata ls' command to succeed")
+
+        # compare output with expected output
+        empty_dict = {}
+        self.assertDictEqual(ret_dict, empty_dict)
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_remove(self):
+        """
+        Remove custom metadata for a specified key in subvolume snapshot metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # remove metadata against specified key.
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
+
+        # confirm key is removed by again fetching metadata
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, key, snapshot, group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because key does not exist")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_remove_for_nonexisting_key(self):
+        """
+        Remove custom metadata for subvolume snapshot if specified key not exist in metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # try to remove value for nonexisting key
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, "key_nonexist", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because 'key_nonexist' does not exist")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_metadata_remove_for_nonexisting_section(self):
         """
-        ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+        Remove custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
         """
-        subvolume = self._generate_random_subvolume_name()
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # remove with snapshot retention (should remove volume, no snapshots to retain)
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
 
-        # verify list subvolumes returns an empty list
-        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
-        self.assertEqual(len(subvolumels), 0)
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
 
-        # verify trash dir is clean
+        # try to remove value for nonexisting key (as section does not exist)
+        # Expecting ENOENT exit status because key does not exist
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, "key", group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
+        else:
+            self.fail("Expected ENOENT because section does not exist")
+
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+    def test_subvolume_snapshot_metadata_remove_force(self):
         """
-        ensure retained subvolume recreate fails if its trash is not yet purged
+        Forcefully remove custom metadata for a specified key in subvolume snapshot metadata.
         """
-        subvolume = self._generate_random_subvolume_name()
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
         snapshot = self._generate_random_snapshot_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
 
         # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
 
-        # remove with snapshot retention
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
 
-        # fake a trash entry
-        self._update_fake_trash(subvolume)
+        # remove metadata against specified key with --force option.
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group, "--force")
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
 
-        # recreate subvolume
+        # confirm key is removed by again fetching metadata
         try:
-            self._fs_cmd("subvolume", "create", self.volname, subvolume)
-        except CommandFailedError as ce:
-            self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+            self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
         else:
-            self.fail("expected recreate of subvolume with purge pending to fail")
-
-        # clear fake trash entry
-        self._update_fake_trash(subvolume, create=False)
-
-        # recreate subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
-
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+            self.fail("Expected ENOENT because key does not exist")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_rm_with_snapshots(self):
-        subvolume = self._generate_random_subvolume_name()
+    def test_subvolume_snapshot_metadata_remove_force_for_nonexisting_key(self):
+        """
+        Forcefully remove custom metadata for subvolume snapshot if specified key not exist in metadata.
+        """
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
         snapshot = self._generate_random_snapshot_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
 
         # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
 
-        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+        # remove metadata against specified key.
         try:
-            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.ENOTEMPTY:
-                raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
+
+        # confirm key is removed by again fetching metadata
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+        except CommandFailedError as e:
+            self.assertEqual(e.exitstatus, errno.ENOENT)
         else:
-            raise RuntimeError("expected subvolume deletion to fail")
+            self.fail("Expected ENOENT because key does not exist")
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # again remove metadata against already removed key with --force option.
+        try:
+            self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group, "--force")
+        except CommandFailedError:
+            self.fail("expected the 'fs subvolume snapshot metadata rm' (with --force) command to succeed")
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-        # verify trash dir is clean
+        # verify trash dir is clean.
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_protect_unprotect_sanity(self):
+    def test_subvolume_snapshot_metadata_after_snapshot_remove(self):
         """
-        Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
-        invoking the command does not cause errors, till they are removed from a subsequent release.
+        Verify metadata removal of subvolume snapshot after snapshot removal.
         """
-        subvolume = self._generate_random_subvolume_name()
+        subvolname = self._generate_random_subvolume_name()
+        group = self._generate_random_group_name()
         snapshot = self._generate_random_snapshot_name()
-        clone = self._generate_random_clone_name()
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+        # create group.
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
 
-        # do some IO
-        self._do_subvolume_io(subvolume, number_of_files=64)
+        # create subvolume in group.
+        self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
 
         # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
-
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
-        # schedule a clone
-        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
 
-        # check clone status
-        self._wait_for_clone_to_complete(clone)
+        # set metadata for snapshot.
+        key = "key"
+        value = "value"
+        self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
 
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+        # get value for specified key.
+        ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
 
-        # verify clone
-        self._verify_clone(subvolume, snapshot, clone)
+        # remove '\n' from returned value.
+        ret = ret.strip('\n')
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # match received value with expected value.
+        self.assertEqual(value, ret)
 
-        # remove subvolumes
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        self._fs_cmd("subvolume", "rm", self.volname, clone)
+        # remove subvolume snapshot.
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
 
-        # verify trash dir is clean
-        self._wait_for_trash_empty()
+        # try to get metadata after removing snapshot.
+        # Expecting error ENOENT with error message of snapshot does not exist
+        cmd_ret = self.mgr_cluster.mon_manager.run_cluster_cmd(
+                args=["fs", "subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group],
+                check_status=False, stdout=StringIO(), stderr=StringIO())
+        self.assertEqual(cmd_ret.returncode, errno.ENOENT, "Expecting ENOENT error")
+        self.assertIn(f"snapshot '{snapshot}' does not exist", cmd_ret.stderr.getvalue(),
+                f"Expecting message: snapshot '{snapshot}' does not exist ")
 
-    def test_subvolume_snapshot_rm_force(self):
-        # test removing non existing subvolume snapshot with --force
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+        # confirm metadata is removed by searching section name in .meta file
+        meta_path = os.path.join(".", "volumes", group, subvolname, ".meta")
+        section_name = "SNAP_METADATA_" + snapshot
 
-        # remove snapshot
         try:
-            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, "--force")
-        except CommandFailedError:
-            raise RuntimeError("expected the 'fs subvolume snapshot rm --force' command to succeed")
+            self.mount_a.run_shell(f"sudo grep {section_name} {meta_path}", omit_sudo=False)
+        except CommandFailedError as e:
+            self.assertNotEqual(e.exitstatus, 0)
+        else:
+            self.fail("Expected non-zero exist status because section should not exist")
 
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+        # verify trash dir is clean.
+        self._wait_for_trash_empty()
 
 class TestSubvolumeSnapshotClones(TestVolumesHelper):
     """ Tests for FS subvolume snapshot clone operations."""
@@ -3903,6 +5013,208 @@ class TestSubvolumeSnapshotClones(TestVolumesHelper):
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
+    def test_clone_failure_status_pending_in_progress_complete(self):
+        """
+        ensure failure status is not shown when clone is not in failed/cancelled state
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1 = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=200)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+        # schedule a clone1
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+        # pending clone shouldn't show failure status
+        clone1_result = self._get_clone_status(clone1)
+        try:
+            clone1_result["status"]["failure"]["errno"]
+        except KeyError as e:
+            self.assertEqual(str(e), "'failure'")
+        else:
+            self.fail("clone status shouldn't show failure for pending clone")
+
+        # check clone1 to be in-progress
+        self._wait_for_clone_to_be_in_progress(clone1)
+
+        # in-progress clone1 shouldn't show failure status
+        clone1_result = self._get_clone_status(clone1)
+        try:
+            clone1_result["status"]["failure"]["errno"]
+        except KeyError as e:
+            self.assertEqual(str(e), "'failure'")
+        else:
+            self.fail("clone status shouldn't show failure for in-progress clone")
+
+        # wait for clone1 to complete
+        self._wait_for_clone_to_complete(clone1)
+
+        # complete clone1 shouldn't show failure status
+        clone1_result = self._get_clone_status(clone1)
+        try:
+            clone1_result["status"]["failure"]["errno"]
+        except KeyError as e:
+            self.assertEqual(str(e), "'failure'")
+        else:
+            self.fail("clone status shouldn't show failure for complete clone")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone1)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_clone_failure_status_failed(self):
+        """
+        ensure failure status is shown when clone is in failed state and validate the reason
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1 = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=200)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+        # schedule a clone1
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+        # remove snapshot from backend to force the clone failure.
+        snappath = os.path.join(".", "volumes", "_nogroup", subvolume, ".snap", snapshot)
+        self.mount_a.run_shell(['rmdir', snappath], sudo=True)
+
+        # wait for clone1 to fail.
+        self._wait_for_clone_to_fail(clone1)
+
+        # check clone1 status
+        clone1_result = self._get_clone_status(clone1)
+        self.assertEqual(clone1_result["status"]["state"], "failed")
+        self.assertEqual(clone1_result["status"]["failure"]["errno"], "2")
+        self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "snapshot '{0}' does not exist".format(snapshot))
+
+        # clone removal should succeed after failure, remove clone1
+        self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_clone_failure_status_pending_cancelled(self):
+        """
+        ensure failure status is shown when clone is cancelled during pending state and validate the reason
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1 = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=200)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+        # schedule a clone1
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+        # cancel pending clone1
+        self._fs_cmd("clone", "cancel", self.volname, clone1)
+
+        # check clone1 status
+        clone1_result = self._get_clone_status(clone1)
+        self.assertEqual(clone1_result["status"]["state"], "canceled")
+        self.assertEqual(clone1_result["status"]["failure"]["errno"], "4")
+        self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "user interrupted clone operation")
+
+        # clone removal should succeed with force after cancelled, remove clone1
+        self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_clone_failure_status_in_progress_cancelled(self):
+        """
+        ensure failure status is shown when clone is cancelled during in-progress state and validate the reason
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone1 = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=200)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # Insert delay at the beginning of snapshot clone
+        self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+        # schedule a clone1
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+        # wait for clone1 to be in-progress
+        self._wait_for_clone_to_be_in_progress(clone1)
+
+        # cancel in-progess clone1
+        self._fs_cmd("clone", "cancel", self.volname, clone1)
+
+        # check clone1 status
+        clone1_result = self._get_clone_status(clone1)
+        self.assertEqual(clone1_result["status"]["state"], "canceled")
+        self.assertEqual(clone1_result["status"]["failure"]["errno"], "4")
+        self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "user interrupted clone operation")
+
+        # clone removal should succeed with force after cancelled, remove clone1
+        self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_snapshot_clone(self):
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()