]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_volumes.py
import 15.2.9
[ceph.git] / ceph / qa / tasks / cephfs / test_volumes.py
index 7984cea9205c1c83cae12cf8ee57c63d72a10a49..ed78775b687c0972e45fee935d28e552a9e25dcd 100644 (file)
@@ -5,9 +5,14 @@ import errno
 import random
 import logging
 import collections
+import uuid
+import unittest
+from hashlib import md5
+from textwrap import dedent
 
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
 from teuthology.exceptions import CommandFailedError
+from teuthology.misc import sudo_write_file
 
 log = logging.getLogger(__name__)
 
@@ -56,9 +61,22 @@ class TestVolumes(CephFSTestCase):
     def _check_clone_canceled(self, clone, clone_group=None):
         self.__check_clone_state("canceled", clone, clone_group, timo=1)
 
-    def _verify_clone_attrs(self, subvolume, clone, source_group=None, clone_group=None):
-        path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
-        path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
+    def _get_subvolume_snapshot_path(self, subvolume, snapshot, source_group, subvol_path, source_version):
+        if source_version == 2:
+            # v2
+            if subvol_path is not None:
+                (base_path, uuid_str) = os.path.split(subvol_path)
+            else:
+                (base_path, uuid_str) = os.path.split(self._get_subvolume_path(self.volname, subvolume, group_name=source_group))
+            return os.path.join(base_path, ".snap", snapshot, uuid_str)
+
+        # v1
+        base_path = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
+        return os.path.join(base_path, ".snap", snapshot)
+
+    def _verify_clone_attrs(self, source_path, clone_path):
+        path1 = source_path
+        path2 = clone_path
 
         p = self.mount_a.run_shell(["find", path1])
         paths = p.stdout.getvalue().strip().split()
@@ -92,12 +110,38 @@ class TestVolumes(CephFSTestCase):
             cval = int(self.mount_a.run_shell(['stat', '-c' '%Y', sink_path]).stdout.getvalue().strip())
             self.assertEqual(sval, cval)
 
-    def _verify_clone(self, subvolume, clone, source_group=None, clone_group=None, timo=120):
-        path1 = self._get_subvolume_path(self.volname, subvolume, group_name=source_group)
+    def _verify_clone_root(self, source_path, clone_path, clone, clone_group, clone_pool):
+        # verifies following clone root attrs quota, data_pool and pool_namespace
+        # remaining attributes of clone root are validated in _verify_clone_attrs
+
+        clone_info = json.loads(self._get_subvolume_info(self.volname, clone, clone_group))
+
+        # verify quota is inherited from source snapshot
+        src_quota = self.mount_a.getfattr(source_path, "ceph.quota.max_bytes")
+        self.assertEqual(clone_info["bytes_quota"], "infinite" if src_quota is None else int(src_quota))
+
+        if clone_pool:
+            # verify pool is set as per request
+            self.assertEqual(clone_info["data_pool"], clone_pool)
+        else:
+            # verify pool and pool namespace are inherited from snapshot
+            self.assertEqual(clone_info["data_pool"],
+                             self.mount_a.getfattr(source_path, "ceph.dir.layout.pool"))
+            self.assertEqual(clone_info["pool_namespace"],
+                             self.mount_a.getfattr(source_path, "ceph.dir.layout.pool_namespace"))
+
+    def _verify_clone(self, subvolume, snapshot, clone,
+                      source_group=None, clone_group=None, clone_pool=None,
+                      subvol_path=None, source_version=2, timo=120):
+        # pass in subvol_path (subvolume path when snapshot was taken) when subvolume is removed
+        # but snapshots are retained for clone verification
+        path1 = self._get_subvolume_snapshot_path(subvolume, snapshot, source_group, subvol_path, source_version)
         path2 = self._get_subvolume_path(self.volname, clone, group_name=clone_group)
 
         check = 0
-        while check < timo:
+        # TODO: currently snapshot rentries are not stable if snapshot source entries
+        #       are removed, https://tracker.ceph.com/issues/46747
+        while check < timo and subvol_path is None:
             val1 = int(self.mount_a.getfattr(path1, "ceph.dir.rentries"))
             val2 = int(self.mount_a.getfattr(path2, "ceph.dir.rentries"))
             if val1 == val2:
@@ -106,7 +150,8 @@ class TestVolumes(CephFSTestCase):
             time.sleep(1)
         self.assertTrue(check < timo)
 
-        self._verify_clone_attrs(subvolume, clone, source_group=source_group, clone_group=clone_group)
+        self._verify_clone_root(path1, path2, clone, clone_group, clone_pool)
+        self._verify_clone_attrs(path1, path2)
 
     def _generate_random_volume_name(self, count=1):
         n = self.volume_start
@@ -184,6 +229,25 @@ class TestVolumes(CephFSTestCase):
     def _delete_test_volume(self):
         self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
 
+    def _do_subvolume_pool_and_namespace_update(self, subvolume, pool=None, pool_namespace=None, subvolume_group=None):
+        subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
+
+        if pool is not None:
+            self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool', pool)
+
+        if pool_namespace is not None:
+            self.mount_a.setfattr(subvolpath, 'ceph.dir.layout.pool_namespace', pool_namespace)
+
+    def _do_subvolume_attr_update(self, subvolume, uid, gid, mode, subvolume_group=None):
+        subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
+
+        # mode
+        self.mount_a.run_shell(['chmod', mode, subvolpath])
+
+        # ownership
+        self.mount_a.run_shell(['chown', uid, subvolpath])
+        self.mount_a.run_shell(['chgrp', gid, subvolpath])
+
     def _do_subvolume_io(self, subvolume, subvolume_group=None, create_dir=None,
                          number_of_files=DEFAULT_NUMBER_OF_FILES, file_size=DEFAULT_FILE_SIZE):
         # get subvolume path for IO
@@ -228,6 +292,60 @@ class TestVolumes(CephFSTestCase):
         trashdir = os.path.join("./", "volumes", "_deleting")
         self.mount_a.wait_for_dir_empty(trashdir, timeout=timeout)
 
+    def _assert_meta_location_and_version(self, vol_name, subvol_name, subvol_group=None, version=2, legacy=False):
+        if legacy:
+            subvol_path = self._get_subvolume_path(vol_name, subvol_name, group_name=subvol_group)
+            m = md5()
+            m.update(("/"+subvol_path).encode('utf-8'))
+            meta_filename = "{0}.meta".format(m.digest().hex())
+            metapath = os.path.join(".", "volumes", "_legacy", meta_filename)
+        else:
+            group = subvol_group if subvol_group is not None else '_nogroup'
+            metapath = os.path.join(".", "volumes", group, subvol_name, ".meta")
+
+        out = self.mount_a.run_shell(['cat', metapath])
+        lines = out.stdout.getvalue().strip().split('\n')
+        sv_version = -1
+        for line in lines:
+            if line == "version = " + str(version):
+                sv_version = version
+                break
+        self.assertEqual(sv_version, version, "version expected was '{0}' but got '{1}' from meta file at '{2}'".format(
+                         version, sv_version, metapath))
+
+    def _create_v1_subvolume(self, subvol_name, subvol_group=None, has_snapshot=True, subvol_type='subvolume', state='complete'):
+        group = subvol_group if subvol_group is not None else '_nogroup'
+        basepath = os.path.join("volumes", group, subvol_name)
+        uuid_str = str(uuid.uuid4())
+        createpath = os.path.join(basepath, uuid_str)
+        self.mount_a.run_shell(['mkdir', '-p', createpath])
+
+        # create a v1 snapshot, to prevent auto upgrades
+        if has_snapshot:
+            snappath = os.path.join(createpath, ".snap", "fake")
+            self.mount_a.run_shell(['mkdir', '-p', snappath])
+
+        # add required xattrs to subvolume
+        default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+        self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
+
+        # create a v1 .meta file
+        meta_contents = "[GLOBAL]\nversion = 1\ntype = {0}\npath = {1}\nstate = {2}\n".format(subvol_type, "/" + createpath, state)
+        if state == 'pending':
+            # add a fake clone source
+            meta_contents = meta_contents + '[source]\nvolume = fake\nsubvolume = fake\nsnapshot = fake\n'
+        meta_filepath1 = os.path.join(self.mount_a.mountpoint, basepath, ".meta")
+        sudo_write_file(self.mount_a.client_remote, meta_filepath1, meta_contents)
+        return createpath
+
+    def _update_fake_trash(self, subvol_name, subvol_group=None, trash_name='fake', create=True):
+        group = subvol_group if subvol_group is not None else '_nogroup'
+        trashpath = os.path.join("volumes", group, subvol_name, '.trash', trash_name)
+        if create:
+            self.mount_a.run_shell(['mkdir', '-p', trashpath])
+        else:
+            self.mount_a.run_shell(['rmdir', trashpath])
+
     def setUp(self):
         super(TestVolumes, self).setUp()
         self.volname = None
@@ -308,6 +426,8 @@ class TestVolumes(CephFSTestCase):
         That the volume can only be removed when --yes-i-really-mean-it is used
         and verify that the deleted volume is not listed anymore.
         """
+        for m in self.mounts:
+            m.umount_wait()
         try:
             self._fs_cmd("volume", "rm", self.volname)
         except CommandFailedError as ce:
@@ -325,11 +445,49 @@ class TestVolumes(CephFSTestCase):
         else:
             raise RuntimeError("expected the 'fs volume rm' command to fail.")
 
+    def test_subvolume_marked(self):
+        """
+        ensure a subvolume is marked with the ceph.dir.subvolume xattr
+        """
+        subvolume = self._generate_random_subvolume_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # getpath
+        subvolpath = self._get_subvolume_path(self.volname, subvolume)
+
+        # subdirectory of a subvolume cannot be moved outside the subvolume once marked with
+        # the xattr ceph.dir.subvolume, hence test by attempting to rename subvol path (incarnation)
+        # outside the subvolume
+        dstpath = os.path.join(self.mount_a.mountpoint, 'volumes', '_nogroup', 'new_subvol_location')
+        srcpath = os.path.join(self.mount_a.mountpoint, subvolpath)
+        rename_script = dedent("""
+            import os
+            import errno
+            try:
+                os.rename("{src}", "{dst}")
+            except OSError as e:
+                if e.errno != errno.EXDEV:
+                    raise RuntimeError("invalid error code on renaming subvolume incarnation out of subvolume directory")
+            else:
+                raise RuntimeError("expected renaming subvolume incarnation out of subvolume directory to fail")
+            """)
+        self.mount_a.run_python(rename_script.format(src=srcpath, dst=dstpath))
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_volume_rm_arbitrary_pool_removal(self):
         """
         That the arbitrary pool added to the volume out of band is removed
         successfully on volume removal.
         """
+        for m in self.mounts:
+            m.umount_wait()
         new_pool = "new_pool"
         # add arbitrary data pool
         self.fs.add_data_pool(new_pool)
@@ -351,6 +509,8 @@ class TestVolumes(CephFSTestCase):
         That the volume can only be removed when mon_allowd_pool_delete is set
         to true and verify that the pools are removed after volume deletion.
         """
+        for m in self.mounts:
+            m.umount_wait()
         self.config_set('mon', 'mon_allow_pool_delete', False)
         try:
             self._fs_cmd("volume", "rm", self.volname, "--yes-i-really-mean-it")
@@ -420,6 +580,12 @@ class TestVolumes(CephFSTestCase):
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, nsize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_shrink(self):
         """
         That a subvolume can be shrinked in size and its quota matches the expected size.
@@ -442,6 +608,12 @@ class TestVolumes(CephFSTestCase):
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, nsize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_resize_fail_invalid_size(self):
         """
         That a subvolume cannot be resized to an invalid size and the quota did not change
@@ -461,15 +633,20 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
         else:
-            raise RuntimeError("expected the 'fs subvolume resize' command to fail")
+            self.fail("expected the 'fs subvolume resize' command to fail")
 
         # verify the quota did not change
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, osize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_resize_fail_zero_size(self):
         """
         That a subvolume cannot be resized to a zero size and the quota did not change
@@ -489,15 +666,20 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
         else:
-            raise RuntimeError("expected the 'fs subvolume resize' command to fail")
+            self.fail("expected the 'fs subvolume resize' command to fail")
 
         # verify the quota did not change
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, osize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_resize_quota_lt_used_size(self):
         """
         That a subvolume can be resized to a size smaller than the current used size
@@ -531,12 +713,18 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize))
         except CommandFailedError:
-            raise RuntimeError("expected the 'fs subvolume resize' command to succeed")
+            self.fail("expected the 'fs subvolume resize' command to succeed")
 
         # verify the quota
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, nsize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
 
     def test_subvolume_resize_fail_quota_lt_used_size_no_shrink(self):
         """
@@ -571,15 +759,20 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "resize", self.volname, subvolname, str(nsize), "--no_shrink")
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on resize of subvolume with invalid size")
         else:
-            raise RuntimeError("expected the 'fs subvolume resize' command to fail")
+            self.fail("expected the 'fs subvolume resize' command to fail")
 
         # verify the quota did not change
         size = int(self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes"))
         self.assertEqual(size, osize)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_resize_expand_on_full_subvolume(self):
         """
         That the subvolume can be expanded from a full subvolume and future writes succeed.
@@ -617,12 +810,18 @@ class TestVolumes(CephFSTestCase):
             try:
                 self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)
             except CommandFailedError:
-                raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB"
+                self.fail("expected filling subvolume {0} with {1} file of size {2}MB"
                                    "to succeed".format(subvolname, number_of_files, file_size))
         else:
-            raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB"
+            self.fail("expected filling subvolume {0} with {1} file of size {2}MB"
                                "to fail".format(subvolname, number_of_files, file_size))
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_create_idempotence(self):
         # create subvolume
         subvolume = self._generate_random_subvolume_name()
@@ -668,6 +867,12 @@ class TestVolumes(CephFSTestCase):
         self._get_subtrees(status=status, rank=1)
         self._wait_subtrees([(path, 1)], status=status)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolumegroup_pin_distributed(self):
         self.fs.set_max_mds(2)
         status = self.fs.wait_for_daemons()
@@ -683,6 +888,13 @@ class TestVolumes(CephFSTestCase):
             self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
         self._wait_distributed_subtrees(10, status=status)
 
+        # remove subvolumes
+        for subvolume in subvolumes:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_pin_random(self):
         self.fs.set_max_mds(2)
         self.fs.wait_for_daemons()
@@ -693,6 +905,12 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolume", "pin", self.volname, subvolume, "random", ".01")
         # no verification
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_create_isolated_namespace(self):
         """
         Create subvolume in separate rados namespace
@@ -720,10 +938,12 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on create of subvolume with invalid pool layout")
         else:
-            raise RuntimeError("expected the 'fs subvolume create' command to fail")
+            self.fail("expected the 'fs subvolume create' command to fail")
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
 
     def test_subvolume_rm_force(self):
         # test removing non-existing subvolume with --force
@@ -731,7 +951,7 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
         except CommandFailedError:
-            raise RuntimeError("expected the 'fs subvolume rm --force' command to succeed")
+            self.fail("expected the 'fs subvolume rm --force' command to succeed")
 
     def test_subvolume_create_with_auto_cleanup_on_fail(self):
         subvolume = self._generate_random_subvolume_name()
@@ -744,10 +964,12 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.ENOENT:
-                raise
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of non-existent subvolume")
         else:
-            raise RuntimeError("expected the 'fs subvolume getpath' command to fail")
+            self.fail("expected the 'fs subvolume getpath' command to fail")
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
 
     def test_subvolume_create_with_invalid_size(self):
         # create subvolume with an invalid size -1
@@ -755,10 +977,12 @@ class TestVolumes(CephFSTestCase):
         try:
             self._fs_cmd("subvolume", "create", self.volname, subvolume, "--size", "-1")
         except CommandFailedError as ce:
-            if ce.exitstatus != errno.EINVAL:
-                raise
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on create of subvolume with invalid size")
         else:
-            raise RuntimeError("expected the 'fs subvolume create' command to fail")
+            self.fail("expected the 'fs subvolume create' command to fail")
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
 
     def test_nonexistent_subvolume_rm(self):
         # remove non-existing subvolume
@@ -803,6 +1027,9 @@ class TestVolumes(CephFSTestCase):
         # remove subvolume
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
 
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_ls(self):
         # tests the 'fs subvolume ls' command
 
@@ -816,11 +1043,18 @@ class TestVolumes(CephFSTestCase):
         # list subvolumes
         subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
         if len(subvolumels) == 0:
-            raise RuntimeError("Expected the 'fs subvolume ls' command to list the created subvolumes.")
+            self.fail("Expected the 'fs subvolume ls' command to list the created subvolumes.")
         else:
             subvolnames = [subvolume['name'] for subvolume in subvolumels]
             if collections.Counter(subvolnames) != collections.Counter(subvolumes):
-                raise RuntimeError("Error creating or listing subvolumes")
+                self.fail("Error creating or listing subvolumes")
+
+        # remove subvolume
+        for subvolume in subvolumes:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
 
     def test_subvolume_ls_for_notexistent_default_group(self):
         # tests the 'fs subvolume ls' command when the default group '_nogroup' doesn't exist
@@ -853,6 +1087,12 @@ class TestVolumes(CephFSTestCase):
         size = self.mount_a.getfattr(subvolpath, "ceph.quota.max_bytes")
         self.assertEqual(size, None)
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_resize_infinite_size_future_writes(self):
         """
         That a subvolume can be resized to an infinite size and the future writes succeed.
@@ -885,15 +1125,21 @@ class TestVolumes(CephFSTestCase):
         try:
             self.mount_a.write_n_mb(os.path.join(subvolpath, filename), file_size)
         except CommandFailedError:
-            raise RuntimeError("expected filling subvolume {0} with {1} file of size {2}MB "
+            self.fail("expected filling subvolume {0} with {1} file of size {2}MB "
                                "to succeed".format(subvolname, number_of_files, file_size))
 
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolname)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_info(self):
         # tests the 'fs subvolume info' command
 
         subvol_md = ["atime", "bytes_pcent", "bytes_quota", "bytes_used", "created_at", "ctime",
                      "data_pool", "gid", "mode", "mon_addrs", "mtime", "path", "pool_namespace",
-                     "type", "uid", "features"]
+                     "type", "uid", "features", "state"]
 
         # create subvolume
         subvolume = self._generate_random_subvolume_name()
@@ -901,17 +1147,17 @@ class TestVolumes(CephFSTestCase):
 
         # get subvolume metadata
         subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
-        self.assertNotEqual(len(subvol_info), 0, "expected the 'fs subvolume info' command to list metadata of subvolume")
         for md in subvol_md:
-            self.assertIn(md, subvol_info.keys(), "'{0}' key not present in metadata of subvolume".format(md))
+            self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
 
         self.assertEqual(subvol_info["bytes_pcent"], "undefined", "bytes_pcent should be set to undefined if quota is not set")
         self.assertEqual(subvol_info["bytes_quota"], "infinite", "bytes_quota should be set to infinite if quota is not set")
         self.assertEqual(subvol_info["pool_namespace"], "", "expected pool namespace to be empty")
+        self.assertEqual(subvol_info["state"], "complete", "expected state to be complete")
 
-        self.assertEqual(len(subvol_info["features"]), 2,
-                         msg="expected 2 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
-        for feature in ['snapshot-clone', 'snapshot-autoprotect']:
+        self.assertEqual(len(subvol_info["features"]), 3,
+                         msg="expected 3 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
+        for feature in ['snapshot-clone', 'snapshot-autoprotect', 'snapshot-retention']:
             self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
         nsize = self.DEFAULT_FILE_SIZE*1024*1024
@@ -919,15 +1165,17 @@ class TestVolumes(CephFSTestCase):
 
         # get subvolume metadata after quota set
         subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
-        self.assertNotEqual(len(subvol_info), 0, "expected the 'fs subvolume info' command to list metadata of subvolume")
+        for md in subvol_md:
+            self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
 
         self.assertNotEqual(subvol_info["bytes_pcent"], "undefined", "bytes_pcent should not be set to undefined if quota is not set")
-        self.assertNotEqual(subvol_info["bytes_quota"], "infinite", "bytes_quota should not be set to infinite if quota is not set")
+        self.assertEqual(subvol_info["bytes_quota"], nsize, "bytes_quota should be set to '{0}'".format(nsize))
         self.assertEqual(subvol_info["type"], "subvolume", "type should be set to subvolume")
+        self.assertEqual(subvol_info["state"], "complete", "expected state to be complete")
 
-        self.assertEqual(len(subvol_info["features"]), 2,
-                         msg="expected 2 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
-        for feature in ['snapshot-clone', 'snapshot-autoprotect']:
+        self.assertEqual(len(subvol_info["features"]), 3,
+                         msg="expected 3 features, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
+        for feature in ['snapshot-clone', 'snapshot-autoprotect', 'snapshot-retention']:
             self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
         # remove subvolumes
@@ -1095,6 +1343,9 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolume", "rm", self.volname, subvol1, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_group_create_with_desired_mode(self):
         group1, group2 = self._generate_random_group_name(2)
         # default mode
@@ -1178,6 +1429,9 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolume", "rm", self.volname, subvol3, group)
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_subvolume_create_with_desired_uid_gid(self):
         """
         That the subvolume can be created with the desired uid and gid and its uid and gid matches the
@@ -1203,6 +1457,9 @@ class TestVolumes(CephFSTestCase):
         # remove subvolume
         self._fs_cmd("subvolume", "rm", self.volname, subvolname)
 
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
     def test_nonexistent_subvolume_group_rm(self):
         group = "non_existent_group"
 
@@ -1286,10 +1543,10 @@ class TestVolumes(CephFSTestCase):
         tests the 'fs subvolume snapshot info' command
         """
 
-        snap_metadata = ["created_at", "data_pool", "has_pending_clones", "size"]
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
 
         subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
+        snapshot, snap_missing = self._generate_random_snapshot_name(2)
 
         # create subvolume
         self._fs_cmd("subvolume", "create", self.volname, subvolume)
@@ -1301,12 +1558,18 @@ class TestVolumes(CephFSTestCase):
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
         snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
-        self.assertNotEqual(len(snap_info), 0)
-        for md in snap_metadata:
-            if md not in snap_info:
-                raise RuntimeError("%s not present in the metadata of subvolume snapshot" % md)
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
         self.assertEqual(snap_info["has_pending_clones"], "no")
 
+        # snapshot info for non-existent snapshot
+        try:
+            self._get_subvolume_snapshot_info(self.volname, subvolume, snap_missing)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot info of non-existent snapshot")
+        else:
+            self.fail("expected snapshot info of non-existent snapshot to fail")
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
@@ -1419,12 +1682,41 @@ class TestVolumes(CephFSTestCase):
 
         subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
         if len(subvolsnapshotls) == 0:
-            raise RuntimeError("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
+            self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
         else:
             snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
             if collections.Counter(snapshotnames) != collections.Counter(snapshots):
-                raise RuntimeError("Error creating or listing subvolume snapshots")
+                self.fail("Error creating or listing subvolume snapshots")
+
+        # remove snapshot
+        for snapshot in snapshots:
+            self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_group_snapshot_unsupported_status(self):
+        group = self._generate_random_group_name()
+        snapshot = self._generate_random_snapshot_name()
 
+        # create group
+        self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+        # snapshot group
+        try:
+            self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
+        else:
+            self.fail("expected subvolumegroup snapshot create command to fail")
+
+        # remove group
+        self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+    @unittest.skip("skipping subvolumegroup snapshot tests")
     def test_subvolume_group_snapshot_create_and_rm(self):
         subvolume = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
@@ -1451,6 +1743,7 @@ class TestVolumes(CephFSTestCase):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+    @unittest.skip("skipping subvolumegroup snapshot tests")
     def test_subvolume_group_snapshot_idempotence(self):
         subvolume = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
@@ -1480,6 +1773,7 @@ class TestVolumes(CephFSTestCase):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+    @unittest.skip("skipping subvolumegroup snapshot tests")
     def test_nonexistent_subvolume_group_snapshot_rm(self):
         subvolume = self._generate_random_subvolume_name()
         group = self._generate_random_group_name()
@@ -1515,6 +1809,7 @@ class TestVolumes(CephFSTestCase):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
+    @unittest.skip("skipping subvolumegroup snapshot tests")
     def test_subvolume_group_snapshot_rm_force(self):
         # test removing non-existing subvolume group snapshot with --force
         group = self._generate_random_group_name()
@@ -1525,6 +1820,7 @@ class TestVolumes(CephFSTestCase):
         except CommandFailedError:
             raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
 
+    @unittest.skip("skipping subvolumegroup snapshot tests")
     def test_subvolume_group_snapshot_ls(self):
         # tests the 'fs subvolumegroup snapshot ls' command
 
@@ -1583,11 +1879,12 @@ class TestVolumes(CephFSTestCase):
         self.mgr_cluster.mgr_fail(mgr)
         self.wait_until_evicted(sessions[0]['id'])
 
-    def test_subvolume_upgrade(self):
+    def test_subvolume_upgrade_legacy_to_v1(self):
         """
         poor man's upgrade test -- rather than going through a full upgrade cycle,
         emulate subvolumes by going through the wormhole and verify if they are
         accessible.
+        further ensure that a legacy volume is not updated to v2.
         """
         subvolume1, subvolume2 = self._generate_random_subvolume_name(2)
         group = self._generate_random_group_name()
@@ -1614,6 +1911,10 @@ class TestVolumes(CephFSTestCase):
         self.assertEqual(createpath1[1:], subvolpath1)
         self.assertEqual(createpath2[1:], subvolpath2)
 
+        # ensure metadata file is in legacy location, with required version v1
+        self._assert_meta_location_and_version(self.volname, subvolume1, version=1, legacy=True)
+        self._assert_meta_location_and_version(self.volname, subvolume2, subvol_group=group, version=1, legacy=True)
+
         # remove subvolume
         self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
         self._fs_cmd("subvolume", "rm", self.volname, subvolume2, group)
@@ -1624,78 +1925,785 @@ class TestVolumes(CephFSTestCase):
         # remove group
         self._fs_cmd("subvolumegroup", "rm", self.volname, group)
 
-    def test_subvolume_rm_with_snapshots(self):
+    def test_subvolume_no_upgrade_v1_sanity(self):
+        """
+        poor man's upgrade test -- theme continues...
+
+        This test is to ensure v1 subvolumes are retained as is, due to a snapshot being present, and runs through
+        a series of operations on the v1 subvolume to ensure they work as expected.
+        """
+        subvol_md = ["atime", "bytes_pcent", "bytes_quota", "bytes_used", "created_at", "ctime",
+                     "data_pool", "gid", "mode", "mon_addrs", "mtime", "path", "pool_namespace",
+                     "type", "uid", "features", "state"]
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
+        clone1, clone2 = self._generate_random_clone_name(2)
+        mode = "777"
+        uid  = "1000"
+        gid  = "1000"
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
-
-        # snapshot subvolume
-        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+        # emulate a v1 subvolume -- in the default group
+        subvolume_path = self._create_v1_subvolume(subvolume)
 
-        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
-        try:
-            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        except CommandFailedError as ce:
-            if ce.exitstatus != errno.ENOTEMPTY:
-                raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
-        else:
-            raise RuntimeError("expected subvolume deletion to fail")
+        # getpath
+        subvolpath = self._get_subvolume_path(self.volname, subvolume)
+        self.assertEqual(subvolpath, subvolume_path)
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # ls
+        subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
+        self.assertEqual(subvolumes[0]['name'], subvolume,
+                         "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
 
-        # remove subvolume
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        # info
+        subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
+        for md in subvol_md:
+            self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
 
-        # verify trash dir is clean
-        self._wait_for_trash_empty()
+        self.assertEqual(subvol_info["state"], "complete",
+                         msg="expected state to be 'complete', found '{0}".format(subvol_info["state"]))
+        self.assertEqual(len(subvol_info["features"]), 2,
+                         msg="expected 1 feature, found '{0}' ({1})".format(len(subvol_info["features"]), subvol_info["features"]))
+        for feature in ['snapshot-clone', 'snapshot-autoprotect']:
+            self.assertIn(feature, subvol_info["features"], msg="expected feature '{0}' in subvolume".format(feature))
 
-    def test_subvolume_snapshot_protect_unprotect_sanity(self):
-        """
-        Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
-        invoking the command does not cause errors, till they are removed from a subsequent release.
-        """
-        subvolume = self._generate_random_subvolume_name()
-        snapshot = self._generate_random_snapshot_name()
-        clone = self._generate_random_clone_name()
+        # resize
+        nsize = self.DEFAULT_FILE_SIZE*1024*1024*10
+        self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
+        subvol_info = json.loads(self._get_subvolume_info(self.volname, subvolume))
+        for md in subvol_md:
+            self.assertIn(md, subvol_info, "'{0}' key not present in metadata of subvolume".format(md))
+        self.assertEqual(subvol_info["bytes_quota"], nsize, "bytes_quota should be set to '{0}'".format(nsize))
 
-        # create subvolume
-        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        # create (idempotent) (change some attrs, to ensure attrs are preserved from the snapshot on clone)
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
 
         # do some IO
-        self._do_subvolume_io(subvolume, number_of_files=64)
+        self._do_subvolume_io(subvolume, number_of_files=8)
 
-        # snapshot subvolume
+        # snap-create
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
-        # now, protect snapshot
-        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
-        # schedule a clone
-        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+        # clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
 
         # check clone status
-        self._wait_for_clone_to_complete(clone)
-
-        # now, unprotect snapshot
-        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+        self._wait_for_clone_to_complete(clone1)
 
-        # remove snapshot
-        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        # ensure clone is v2
+        self._assert_meta_location_and_version(self.volname, clone1, version=2)
 
         # verify clone
-        self._verify_clone(subvolume, clone)
+        self._verify_clone(subvolume, snapshot, clone1, source_version=1)
 
-        # remove subvolumes
-        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
-        self._fs_cmd("subvolume", "rm", self.volname, clone)
+        # clone (older snapshot)
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, 'fake', clone2)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone2)
+
+        # ensure clone is v2
+        self._assert_meta_location_and_version(self.volname, clone2, version=2)
+
+        # verify clone
+        # TODO: rentries will mismatch till this is fixed https://tracker.ceph.com/issues/46747
+        #self._verify_clone(subvolume, 'fake', clone2, source_version=1)
+
+        # snap-info
+        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+        self.assertEqual(snap_info["has_pending_clones"], "no")
+
+        # snap-ls
+        subvol_snapshots = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+        self.assertEqual(len(subvol_snapshots), 2, "subvolume ls count mismatch, expected 2', found {0}".format(len(subvol_snapshots)))
+        snapshotnames = [snapshot['name'] for snapshot in subvol_snapshots]
+        for name in [snapshot, 'fake']:
+            self.assertIn(name, snapshotnames, msg="expected snapshot '{0}' in subvolume snapshot ls".format(name))
+
+        # snap-rm
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, "fake")
+
+        # ensure volume is still at version 1
+        self._assert_meta_location_and_version(self.volname, subvolume, version=1)
+
+        # rm
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone1)
+        self._fs_cmd("subvolume", "rm", self.volname, clone2)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_no_upgrade_v1_to_v2(self):
+        """
+        poor man's upgrade test -- theme continues...
+        ensure v1 to v2 upgrades are not done automatically due to various states of v1
+        """
+        subvolume1, subvolume2, subvolume3 = self._generate_random_subvolume_name(3)
+        group = self._generate_random_group_name()
+
+        # emulate a v1 subvolume -- in the default group
+        subvol1_path = self._create_v1_subvolume(subvolume1)
+
+        # emulate a v1 subvolume -- in a custom group
+        subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group)
+
+        # emulate a v1 subvolume -- in a clone pending state
+        self._create_v1_subvolume(subvolume3, subvol_type='clone', has_snapshot=False, state='pending')
+
+        # this would attempt auto-upgrade on access, but fail to do so as snapshots exist
+        subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+        self.assertEqual(subvolpath1, subvol1_path)
+
+        subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+        self.assertEqual(subvolpath2, subvol2_path)
+
+        # this would attempt auto-upgrade on access, but fail to do so as volume is not complete
+        # use clone status, as only certain operations are allowed in pending state
+        status = json.loads(self._fs_cmd("clone", "status", self.volname, subvolume3))
+        self.assertEqual(status["status"]["state"], "pending")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, "fake")
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume2, "fake", group)
+
+        # ensure metadata file is in v1 location, with version retained as v1
+        self._assert_meta_location_and_version(self.volname, subvolume1, version=1)
+        self._assert_meta_location_and_version(self.volname, subvolume2, subvol_group=group, version=1)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume2, group)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume3)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on rm of subvolume undergoing clone")
+        else:
+            self.fail("expected rm of subvolume undergoing clone to fail")
+
+        # ensure metadata file is in v1 location, with version retained as v1
+        self._assert_meta_location_and_version(self.volname, subvolume3, version=1)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume3, "--force")
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_upgrade_v1_to_v2(self):
+        """
+        poor man's upgrade test -- theme continues...
+        ensure v1 to v2 upgrades work
+        """
+        subvolume1, subvolume2 = self._generate_random_subvolume_name(2)
+        group = self._generate_random_group_name()
+
+        # emulate a v1 subvolume -- in the default group
+        subvol1_path = self._create_v1_subvolume(subvolume1, has_snapshot=False)
+
+        # emulate a v1 subvolume -- in a custom group
+        subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group, has_snapshot=False)
+
+        # this would attempt auto-upgrade on access
+        subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+        self.assertEqual(subvolpath1, subvol1_path)
+
+        subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+        self.assertEqual(subvolpath2, subvol2_path)
+
+        # ensure metadata file is in v2 location, with version retained as v2
+        self._assert_meta_location_and_version(self.volname, subvolume1, version=2)
+        self._assert_meta_location_and_version(self.volname, subvolume2, subvol_group=group, version=2)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume2, group)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_rm_with_snapshots(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            if ce.exitstatus != errno.ENOTEMPTY:
+                raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+        else:
+            raise RuntimeError("expected subvolume deletion to fail")
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_without_snapshots(self):
+        """
+        ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+        """
+        subvolume = self._generate_random_subvolume_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # remove with snapshot retention (should remove volume, no snapshots to retain)
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_with_snapshots(self):
+        """
+        ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
+        also test allowed and dis-allowed operations on a retained subvolume
+        """
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        ## test allowed ops in retained state
+        # ls
+        subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
+        self.assertEqual(subvolumes[0]['name'], subvolume,
+                         "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
+
+        # snapshot info
+        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+        self.assertEqual(snap_info["has_pending_clones"], "no")
+
+        # rm --force (allowed but should fail)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        # rm (allowed but should fail)
+        try:
+            self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+        else:
+            self.fail("expected rm of subvolume with retained snapshots to fail")
+
+        ## test disallowed ops
+        # getpath
+        try:
+            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+        else:
+            self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+        # resize
+        nsize = self.DEFAULT_FILE_SIZE*1024*1024
+        try:
+            self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
+        else:
+            self.fail("expected resize of subvolume with retained snapshots to fail")
+
+        # snap-create
+        try:
+            self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+        else:
+            self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+
+        # remove snapshot (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_invalid_recreate(self):
+        """
+        ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # recreate subvolume with an invalid pool
+        data_pool = "invalid_pool"
+        try:
+            self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+        else:
+            self.fail("expected recreate of subvolume with invalid poolname to fail")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # getpath
+        try:
+            self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+        else:
+            self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+        # remove snapshot (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+        """
+        ensure retained subvolume recreate fails if its trash is not yet purged
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fake a trash entry
+        self._update_fake_trash(subvolume)
+
+        # recreate subvolume
+        try:
+            self._fs_cmd("subvolume", "create", self.volname, subvolume)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+        else:
+            self.fail("expected recreate of subvolume with purge pending to fail")
+
+        # clear fake trash entry
+        self._update_fake_trash(subvolume, create=False)
+
+        # recreate subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_trash_busy_recreate_clone(self):
+        """
+        ensure retained clone recreate fails if its trash is not yet purged
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # clone subvolume snapshot
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # snapshot clone
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot)
+
+        # remove clone with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, clone, "--retain-snapshots")
+
+        # fake a trash entry
+        self._update_fake_trash(clone)
+
+        # clone subvolume snapshot (recreate)
+        try:
+            self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of clone with purge pending")
+        else:
+            self.fail("expected recreate of clone with purge pending to fail")
+
+        # clear fake trash entry
+        self._update_fake_trash(clone, create=False)
+
+        # recreate subvolume
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_recreate_subvolume(self):
+        """
+        ensure a retained subvolume can be recreated and further snapshotted
+        """
+        snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+        subvolume = self._generate_random_subvolume_name()
+        snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "snapshot-retained",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # recreate retained subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # fetch info
+        subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+        self.assertEqual(subvol_info["state"], "complete",
+                         msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+        # snapshot info (older snapshot)
+        snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
+        for md in snap_md:
+            self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+        self.assertEqual(snap_info["has_pending_clones"], "no")
+
+        # snap-create (new snapshot)
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+
+        # remove with retain snapshots
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # list snapshots
+        subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+        self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
+                         " created subvolume snapshots")
+        snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+        for snap in [snapshot1, snapshot2]:
+            self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+
+        # remove snapshots (should remove volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_clone(self):
+        """
+        clone a snapshot from a snapshot retained subvolume
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # store path for clone verification
+        subvol_path = self._get_subvolume_path(self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=16)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # clone retained subvolume snapshot
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, subvol_path=subvol_path)
+
+        # remove snapshots (removes retained volume)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
 
         # verify trash dir is clean
         self._wait_for_trash_empty()
 
-    def test_subvolume_snapshot_clone(self):
+    def test_subvolume_retain_snapshot_recreate(self):
+        """
+        recreate a subvolume from one of its retained snapshots
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # store path for clone verification
+        subvol_path = self._get_subvolume_path(self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=16)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # recreate retained subvolume using its own snapshot to clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, subvolume)
+
+        # check clone status
+        self._wait_for_clone_to_complete(subvolume)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot, subvolume, subvol_path=subvol_path)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_retain_snapshot_with_snapshots(self):
+        """
+        retain snapshots of a cloned subvolume and check disallowed operations
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # store path for clone verification
+        subvol1_path = self._get_subvolume_path(self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=16)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # clone retained subvolume snapshot
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot1, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot1, clone, subvol_path=subvol1_path)
+
+        # create a snapshot on the clone
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot2)
+
+        # retain a clone
+        self._fs_cmd("subvolume", "rm", self.volname, clone, "--retain-snapshots")
+
+        # list snapshots
+        clonesnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, clone))
+        self.assertEqual(len(clonesnapshotls), 1, "Expected the 'fs subvolume snapshot ls' command to list the"
+                         " created subvolume snapshots")
+        snapshotnames = [snapshot['name'] for snapshot in clonesnapshotls]
+        for snap in [snapshot2]:
+            self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+
+        ## check disallowed operations on retained clone
+        # clone-status
+        try:
+            self._fs_cmd("clone", "status", self.volname, clone)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on clone status of clone with retained snapshots")
+        else:
+            self.fail("expected clone status of clone with retained snapshots to fail")
+
+        # clone-cancel
+        try:
+            self._fs_cmd("clone", "cancel", self.volname, clone)
+        except CommandFailedError as ce:
+            self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on clone cancel of clone with retained snapshots")
+        else:
+            self.fail("expected clone cancel of clone with retained snapshots to fail")
+
+        # remove snapshots (removes subvolumes as all are in retained state)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone, snapshot2)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_retain_snapshot_clone_from_newer_snapshot(self):
+        """
+        clone a subvolume from recreated subvolume's latest snapshot
+        """
+        subvolume = self._generate_random_subvolume_name()
+        snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+        clone = self._generate_random_clone_name(1)
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=16)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # recreate subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # get and store path for clone verification
+        subvol2_path = self._get_subvolume_path(self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=16)
+
+        # snapshot newer subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+
+        # remove with snapshot retention
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+        # clone retained subvolume's newer snapshot
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot2, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot2, clone, subvol_path=subvol2_path)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+
+        # remove subvolume
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify list subvolumes returns an empty list
+        subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+        self.assertEqual(len(subvolumels), 0)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_protect_unprotect_sanity(self):
+        """
+        Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
+        invoking the command does not cause errors, till they are removed from a subsequent release.
+        """
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
         clone = self._generate_random_clone_name()
@@ -1709,17 +2717,56 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # now, protect snapshot
+        self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # now, unprotect snapshot
+        self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_snapshot_clone(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+
+        # create subvolume
+        self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=64)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # schedule a clone
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
         # verify clone
-        self._verify_clone(subvolume, clone)
+        self._verify_clone(subvolume, snapshot, clone)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@@ -1771,12 +2818,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, clone_pool=new_pool)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         subvol_path = self._get_subvolume_path(self.volname, clone)
         desired_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
         self.assertEqual(desired_pool, new_pool)
@@ -1796,6 +2843,9 @@ class TestVolumes(CephFSTestCase):
         mode = "777"
         uid  = "1000"
         gid  = "1000"
+        new_uid  = "1001"
+        new_gid  = "1001"
+        new_mode = "700"
 
         # create subvolume
         self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode", mode, "--uid", uid, "--gid", gid)
@@ -1806,17 +2856,64 @@ class TestVolumes(CephFSTestCase):
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # change subvolume attrs (to ensure clone picks up snapshot attrs)
+        self._do_subvolume_attr_update(subvolume, new_uid, new_gid, new_mode)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
+        # remove subvolumes
+        self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+        self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+        # verify trash dir is clean
+        self._wait_for_trash_empty()
+
+    def test_subvolume_clone_inherit_snapshot_namespace_and_size(self):
+        subvolume = self._generate_random_subvolume_name()
+        snapshot = self._generate_random_snapshot_name()
+        clone = self._generate_random_clone_name()
+        osize = self.DEFAULT_FILE_SIZE*1024*1024*12
+
+        # create subvolume, in an isolated namespace with a specified size
+        self._fs_cmd("subvolume", "create", self.volname, subvolume, "--namespace-isolated", "--size", str(osize))
+
+        # do some IO
+        self._do_subvolume_io(subvolume, number_of_files=8)
+
+        # snapshot subvolume
+        self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+        # create a pool different from current subvolume pool
+        subvol_path = self._get_subvolume_path(self.volname, subvolume)
+        default_pool = self.mount_a.getfattr(subvol_path, "ceph.dir.layout.pool")
+        new_pool = "new_pool"
+        self.assertNotEqual(default_pool, new_pool)
+        self.fs.add_data_pool(new_pool)
+
+        # update source subvolume pool
+        self._do_subvolume_pool_and_namespace_update(subvolume, pool=new_pool, pool_namespace="")
+
+        # schedule a clone, with NO --pool specification
+        self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+        # check clone status
+        self._wait_for_clone_to_complete(clone)
+
         # verify clone
-        self._verify_clone(subvolume, clone)
+        self._verify_clone(subvolume, snapshot, clone)
+
+        # remove snapshot
+        self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@@ -1845,12 +2942,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone1)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone1)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone1)
-
         # now the clone is just like a normal subvolume -- snapshot the clone and fork
         # another clone. before that do some IO so it's can be differentiated.
         self._do_subvolume_io(clone1, create_dir="data", number_of_files=32)
@@ -1864,12 +2961,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone2)
 
+        # verify clone
+        self._verify_clone(clone1, snapshot, clone2)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone1, snapshot)
 
-        # verify clone
-        self._verify_clone(clone1, clone2)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone1)
@@ -1902,12 +2999,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=group)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, clone_group=group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, clone_group=group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone, group)
@@ -1942,12 +3039,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_group=group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, source_group=group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -1984,12 +3081,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone, clone_group=c_group)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_group=s_group, clone_group=c_group)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, s_group)
 
-        # verify clone
-        self._verify_clone(subvolume, clone, source_group=s_group, clone_group=c_group)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume, s_group)
         self._fs_cmd("subvolume", "rm", self.volname, clone, c_group)
@@ -2006,6 +3103,7 @@ class TestVolumes(CephFSTestCase):
         yet another poor man's upgrade test -- rather than going through a full
         upgrade cycle, emulate old types subvolumes by going through the wormhole
         and verify clone operation.
+        further ensure that a legacy volume is not updated to v2, but clone is.
         """
         subvolume = self._generate_random_subvolume_name()
         snapshot = self._generate_random_snapshot_name()
@@ -2015,12 +3113,19 @@ class TestVolumes(CephFSTestCase):
         createpath = os.path.join(".", "volumes", "_nogroup", subvolume)
         self.mount_a.run_shell(['mkdir', '-p', createpath])
 
+        # add required xattrs to subvolume
+        default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+        self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool)
+
         # do some IO
         self._do_subvolume_io(subvolume, number_of_files=64)
 
         # snapshot subvolume
         self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
 
+        # ensure metadata file is in legacy location, with required version v1
+        self._assert_meta_location_and_version(self.volname, subvolume, version=1, legacy=True)
+
         # schedule a clone
         self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
 
@@ -2035,11 +3140,14 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone, source_version=1)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
+        # ensure metadata file is in v2 location, with required version v2
+        self._assert_meta_location_and_version(self.volname, clone)
 
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
@@ -2081,12 +3189,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2126,12 +3234,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2171,12 +3279,12 @@ class TestVolumes(CephFSTestCase):
         subvolpath = self._get_subvolume_path(self.volname, clone)
         self.assertNotEqual(subvolpath, None)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)
@@ -2242,12 +3350,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume1, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume1, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
         self._fs_cmd("subvolume", "rm", self.volname, subvolume2)
@@ -2288,7 +3396,7 @@ class TestVolumes(CephFSTestCase):
         self._wait_for_clone_to_complete(clone1)
 
         # verify clone
-        self._verify_clone(subvolume, clone1)
+        self._verify_clone(subvolume, snapshot, clone1, clone_pool=new_pool)
 
         # wait a bit so that subsequent I/O will give pool full error
         time.sleep(120)
@@ -2339,12 +3447,12 @@ class TestVolumes(CephFSTestCase):
         # check clone status
         self._wait_for_clone_to_complete(clone)
 
+        # verify clone
+        self._verify_clone(subvolume, snapshot, clone)
+
         # remove snapshot
         self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
 
-        # verify clone
-        self._verify_clone(subvolume, clone)
-
         # remove subvolumes
         self._fs_cmd("subvolume", "rm", self.volname, subvolume)
         self._fs_cmd("subvolume", "rm", self.volname, clone)