import unittest
from hashlib import md5
from textwrap import dedent
+from io import StringIO
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from tasks.cephfs.fuse_mount import FuseMount
time.sleep(1)
self.assertTrue(check < timo)
+ def _get_clone_status(self, clone, clone_group=None):
+ args = ["clone", "status", self.volname, clone]
+ if clone_group:
+ args.append(clone_group)
+ args = tuple(args)
+ result = json.loads(self._fs_cmd(*args))
+ return result
+
def _wait_for_clone_to_complete(self, clone, clone_group=None, timo=120):
self.__check_clone_state("complete", clone, clone_group, timo)
def _wait_for_clone_to_fail(self, clone, clone_group=None, timo=120):
self.__check_clone_state("failed", clone, clone_group, timo)
+ def _wait_for_clone_to_be_in_progress(self, clone, clone_group=None, timo=120):
+ self.__check_clone_state("in-progress", clone, clone_group, timo)
+
def _check_clone_canceled(self, clone, clone_group=None):
self.__check_clone_state("canceled", clone, clone_group, timo=1)
# verify trash dir is clean
self._wait_for_trash_empty()
-
-class TestSubvolumeGroupSnapshots(TestVolumesHelper):
- """Tests for FS subvolume group snapshot operations."""
- @unittest.skip("skipping subvolumegroup snapshot tests")
- def test_nonexistent_subvolume_group_snapshot_rm(self):
+ def test_subvolume_retain_snapshot_rm_idempotency(self):
+ """
+ ensure subvolume deletion of a subvolume which is already deleted with retain snapshots option passes.
+ After subvolume deletion with retain snapshots, the subvolume exists until the trash directory (resides inside subvolume)
+ is cleaned up. The subvolume deletion issued while the trash directory is not empty, should pass and should
+ not error out with EAGAIN.
+ """
subvolume = self._generate_random_subvolume_name()
- group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
- # create group
- self._fs_cmd("subvolumegroup", "create", self.volname, group)
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=256)
- # snapshot group
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
- # remove snapshot
- self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
- # remove snapshot
+ # remove snapshots (removes retained volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume (check idempotency)
try:
- self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
except CommandFailedError as ce:
if ce.exitstatus != errno.ENOENT:
- raise
- else:
- raise RuntimeError("expected the 'fs subvolumegroup snapshot rm' command to fail")
-
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ self.fail(f"expected subvolume rm to pass with error: {os.strerror(ce.exitstatus)}")
# verify trash dir is clean
self._wait_for_trash_empty()
- # remove group
- self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- @unittest.skip("skipping subvolumegroup snapshot tests")
- def test_subvolume_group_snapshot_create_and_rm(self):
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_user_metadata_set(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- snapshot = self._generate_random_snapshot_name()
- # create group
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
-
- # snapshot group
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # remove snapshot
- self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ try:
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata set' command to succeed")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- # remove group
- self._fs_cmd("subvolumegroup", "rm", self.volname, group)
-
- @unittest.skip("skipping subvolumegroup snapshot tests")
- def test_subvolume_group_snapshot_idempotence(self):
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_user_metadata_set_idempotence(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- snapshot = self._generate_random_snapshot_name()
- # create group
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
-
- # snapshot group
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
-
- # try creating snapshot w/ same snapshot name -- shoule be idempotent
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
-
- # remove snapshot
- self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ try:
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata set' command to succeed")
- # verify trash dir is clean
- self._wait_for_trash_empty()
+ # set same metadata again for subvolume.
+ try:
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata set' command to succeed because it is idempotent operation")
- # remove group
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- @unittest.skip("skipping subvolumegroup snapshot tests")
- def test_subvolume_group_snapshot_ls(self):
- # tests the 'fs subvolumegroup snapshot ls' command
-
- snapshots = []
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
- # create group
+ def test_subvolume_user_metadata_get(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
+
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolumegroup snapshots
- snapshots = self._generate_random_snapshot_name(3)
- for snapshot in snapshots:
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- subvolgrpsnapshotls = json.loads(self._fs_cmd('subvolumegroup', 'snapshot', 'ls', self.volname, group))
- if len(subvolgrpsnapshotls) == 0:
- raise RuntimeError("Expected the 'fs subvolumegroup snapshot ls' command to list the created subvolume group snapshots")
- else:
- snapshotnames = [snapshot['name'] for snapshot in subvolgrpsnapshotls]
- if collections.Counter(snapshotnames) != collections.Counter(snapshots):
- raise RuntimeError("Error creating or listing subvolume group snapshots")
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- @unittest.skip("skipping subvolumegroup snapshot tests")
- def test_subvolume_group_snapshot_rm_force(self):
- # test removing non-existing subvolume group snapshot with --force
- group = self._generate_random_group_name()
- snapshot = self._generate_random_snapshot_name()
- # remove snapshot
+ # get value for specified key.
try:
- self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot, "--force")
+ ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
except CommandFailedError:
- raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
-
- def test_subvolume_group_snapshot_unsupported_status(self):
- group = self._generate_random_group_name()
- snapshot = self._generate_random_snapshot_name()
+ self.fail("expected the 'fs subvolume metadata get' command to succeed")
- # create group
- self._fs_cmd("subvolumegroup", "create", self.volname, group)
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
- # snapshot group
- try:
- self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
- else:
- self.fail("expected subvolumegroup snapshot create command to fail")
+ # match received value with expected value.
+ self.assertEqual(value, ret)
- # remove group
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
-class TestSubvolumeSnapshots(TestVolumesHelper):
- """Tests for FS subvolume snapshot operations."""
- def test_nonexistent_subvolume_snapshot_rm(self):
- subvolume = self._generate_random_subvolume_name()
- snapshot = self._generate_random_snapshot_name()
+ def test_subvolume_user_metadata_get_for_nonexisting_key(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- # remove snapshot again
+ # try to get value for nonexisting key
+ # Expecting ENOENT exit status because key does not exist
try:
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
- except CommandFailedError as ce:
- if ce.exitstatus != errno.ENOENT:
- raise
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key_nonexist", "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
else:
- raise RuntimeError("expected the 'fs subvolume snapshot rm' command to fail")
+ self.fail("Expected ENOENT because 'key_nonexist' does not exist")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_snapshot_create_and_rm(self):
- subvolume = self._generate_random_subvolume_name()
- snapshot = self._generate_random_snapshot_name()
+ def test_subvolume_user_metadata_get_for_nonexisting_section(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # try to get value for nonexisting key (as section does not exist)
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key", "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because section does not exist")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_snapshot_create_idempotence(self):
- subvolume = self._generate_random_subvolume_name()
- snapshot = self._generate_random_snapshot_name()
+ def test_subvolume_user_metadata_update(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # try creating w/ same subvolume snapshot name -- should be idempotent
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # update metadata against key.
+ new_value = "new_value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, new_value, "--group_name", group)
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ # get metadata for specified key of subvolume.
+ try:
+ ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata get' command to succeed")
- # verify trash dir is clean
- self._wait_for_trash_empty()
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
- def test_subvolume_snapshot_info(self):
+ # match received value with expected value.
+ self.assertEqual(new_value, ret)
- """
- tests the 'fs subvolume snapshot info' command
- """
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
- subvolume = self._generate_random_subvolume_name()
- snapshot, snap_missing = self._generate_random_snapshot_name(2)
+ def test_subvolume_user_metadata_list(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # do some IO
- self._do_subvolume_io(subvolume, number_of_files=1)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # set metadata for subvolume.
+ input_metadata_dict = {f'key_{i}' : f'value_{i}' for i in range(3)}
- snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
- for md in snap_md:
- self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
- self.assertEqual(snap_info["has_pending_clones"], "no")
+ for k, v in input_metadata_dict.items():
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, k, v, "--group_name", group)
- # snapshot info for non-existent snapshot
+ # list metadata
try:
- self._get_subvolume_snapshot_info(self.volname, subvolume, snap_missing)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot info of non-existent snapshot")
- else:
- self.fail("expected snapshot info of non-existent snapshot to fail")
+ ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata ls' command to succeed")
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ ret_dict = json.loads(ret)
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ # compare output with expected output
+ self.assertDictEqual(input_metadata_dict, ret_dict)
- # verify trash dir is clean
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_snapshot_in_group(self):
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_user_metadata_list_if_no_metadata_set(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- snapshot = self._generate_random_snapshot_name()
- # create group
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # snapshot subvolume in group
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+ # list metadata
+ try:
+ ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata ls' command to succeed")
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ # compare output with expected output
+ # expecting empty json/dictionary
+ self.assertEqual(ret, "{}")
- # verify trash dir is clean
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- # remove group
- self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+ def test_subvolume_user_metadata_remove(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- def test_subvolume_snapshot_ls(self):
- # tests the 'fs subvolume snapshot ls' command
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- snapshots = []
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # create subvolume
- subvolume = self._generate_random_subvolume_name()
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- # create subvolume snapshots
- snapshots = self._generate_random_snapshot_name(3)
- for snapshot in snapshots:
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # remove metadata against specified key.
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata rm' command to succeed")
- subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
- if len(subvolsnapshotls) == 0:
- self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
+ # confirm key is removed by again fetching metadata
+ try:
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
else:
- snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
- if collections.Counter(snapshotnames) != collections.Counter(snapshots):
- self.fail("Error creating or listing subvolume snapshots")
-
- # remove snapshot
- for snapshot in snapshots:
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self.fail("Expected ENOENT because key does not exist")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_inherited_snapshot_ls(self):
- # tests the scenario where 'fs subvolume snapshot ls' command
- # should not list inherited snapshots created as part of snapshot
- # at ancestral level
-
- snapshots = []
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_user_metadata_remove_for_nonexisting_key(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- snap_count = 3
- # create group
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # create subvolume snapshots
- snapshots = self._generate_random_snapshot_name(snap_count)
- for snapshot in snapshots:
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- # Create snapshot at ancestral level
- ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_1")
- ancestral_snappath2 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_2")
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1, ancestral_snappath2], sudo=True)
+ # try to remove value for nonexisting key
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key_nonexist", "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because 'key_nonexist' does not exist")
- subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume, group))
- self.assertEqual(len(subvolsnapshotls), snap_count)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1, ancestral_snappath2], sudo=True)
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
- # remove snapshot
- for snapshot in snapshots:
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+ def test_subvolume_user_metadata_remove_for_nonexisting_section(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # verify trash dir is clean
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
+
+ # try to remove value for nonexisting key (as section does not exist)
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key", "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because section does not exist")
+
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- # remove group
+ def test_subvolume_user_metadata_remove_force(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
+
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+
+ # remove metadata against specified key with --force option.
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group, "--force")
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata rm' command to succeed")
+
+ # confirm key is removed by again fetching metadata
+ try:
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because key does not exist")
+
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- def test_subvolume_inherited_snapshot_info(self):
- """
- tests the scenario where 'fs subvolume snapshot info' command
- should fail for inherited snapshots created as part of snapshot
- at ancestral level
- """
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_user_metadata_remove_force_for_nonexisting_key(self):
+ subvolname = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- # create group
+ # create group.
self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # create subvolume in group
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, "--group_name", group)
- # Create snapshot at ancestral level
- ancestral_snap_name = "ancestral_snap_1"
- ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
- # Validate existence of inherited snapshot
- group_path = os.path.join(".", "volumes", group)
- inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
- inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
- inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
- self.mount_a.run_shell(['ls', inherited_snappath])
+ # remove metadata against specified key.
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata rm' command to succeed")
- # snapshot info on inherited snapshot
+ # confirm key is removed by again fetching metadata
try:
- self._get_subvolume_snapshot_info(self.volname, subvolume, inherited_snap, group)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on snapshot info of inherited snapshot")
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
else:
- self.fail("expected snapshot info of inherited snapshot to fail")
+ self.fail("Expected ENOENT because key does not exist")
- # remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+ # again remove metadata against already removed key with --force option.
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, key, "--group_name", group, "--force")
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata rm' (with --force) command to succeed")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- # remove group
+ def test_subvolume_user_metadata_set_and_get_for_legacy_subvolume(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # emulate a old-fashioned subvolume in a custom group
+ createpath = os.path.join(".", "volumes", group, subvolname)
+ self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+ # set metadata for subvolume.
+ key = "key"
+ value = "value"
+ try:
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, key, value, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata set' command to succeed")
+
+ # get value for specified key.
+ try:
+ ret = self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, key, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata get' command to succeed")
+
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
+
+ # match received value with expected value.
+ self.assertEqual(value, ret)
+
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- def test_subvolume_inherited_snapshot_rm(self):
- """
- tests the scenario where 'fs subvolume snapshot rm' command
- should fail for inherited snapshots created as part of snapshot
- at ancestral level
- """
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_user_metadata_list_and_remove_for_legacy_subvolume(self):
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # emulate a old-fashioned subvolume in a custom group
+ createpath = os.path.join(".", "volumes", group, subvolname)
+ self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+ # set metadata for subvolume.
+ input_metadata_dict = {f'key_{i}' : f'value_{i}' for i in range(3)}
+
+ for k, v in input_metadata_dict.items():
+ self._fs_cmd("subvolume", "metadata", "set", self.volname, subvolname, k, v, "--group_name", group)
+
+ # list metadata
+ try:
+ ret = self._fs_cmd("subvolume", "metadata", "ls", self.volname, subvolname, "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata ls' command to succeed")
+
+ ret_dict = json.loads(ret)
+
+ # compare output with expected output
+ self.assertDictEqual(input_metadata_dict, ret_dict)
+
+ # remove metadata against specified key.
+ try:
+ self._fs_cmd("subvolume", "metadata", "rm", self.volname, subvolname, "key_1", "--group_name", group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume metadata rm' command to succeed")
+
+ # confirm key is removed by again fetching metadata
+ try:
+ self._fs_cmd("subvolume", "metadata", "get", self.volname, subvolname, "key_1", "--group_name", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because key_1 does not exist")
+
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+class TestSubvolumeGroupSnapshots(TestVolumesHelper):
+ """Tests for FS subvolume group snapshot operations."""
+ @unittest.skip("skipping subvolumegroup snapshot tests")
+ def test_nonexistent_subvolume_group_snapshot_rm(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
# create group
self._fs_cmd("subvolumegroup", "create", self.volname, group)
# create subvolume in group
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
- # Create snapshot at ancestral level
- ancestral_snap_name = "ancestral_snap_1"
- ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+ # snapshot group
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
- # Validate existence of inherited snap
- group_path = os.path.join(".", "volumes", group)
- inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
- inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
- inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
- self.mount_a.run_shell(['ls', inherited_snappath])
+ # remove snapshot
+ self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
- # inherited snapshot should not be deletable
+ # remove snapshot
try:
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, inherited_snap, "--group_name", group)
+ self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when removing inherited snapshot")
+ if ce.exitstatus != errno.ENOENT:
+ raise
else:
- self.fail("expected removing inheirted snapshot to fail")
-
- # remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+ raise RuntimeError("expected the 'fs subvolumegroup snapshot rm' command to fail")
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- def test_subvolume_subvolumegroup_snapshot_name_conflict(self):
- """
- tests the scenario where creation of subvolume snapshot name
- with same name as it's subvolumegroup snapshot name. This should
- fail.
- """
-
+ @unittest.skip("skipping subvolumegroup snapshot tests")
+ def test_subvolume_group_snapshot_create_and_rm(self):
subvolume = self._generate_random_subvolume_name()
group = self._generate_random_group_name()
- group_snapshot = self._generate_random_snapshot_name()
+ snapshot = self._generate_random_snapshot_name()
# create group
self._fs_cmd("subvolumegroup", "create", self.volname, group)
# create subvolume in group
self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
- # Create subvolumegroup snapshot
- group_snapshot_path = os.path.join(".", "volumes", group, ".snap", group_snapshot)
- self.mount_a.run_shell(['mkdir', '-p', group_snapshot_path], sudo=True)
-
- # Validate existence of subvolumegroup snapshot
- self.mount_a.run_shell(['ls', group_snapshot_path])
-
- # Creation of subvolume snapshot with it's subvolumegroup snapshot name should fail
- try:
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, group_snapshot, "--group_name", group)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when creating subvolume snapshot with same name as subvolume group snapshot")
- else:
- self.fail("expected subvolume snapshot creation with same name as subvolumegroup snapshot to fail")
+ # snapshot group
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
- # remove subvolumegroup snapshot
- self.mount_a.run_shell(['rmdir', group_snapshot_path], sudo=True)
+ # remove snapshot
+ self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
# remove group
self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- def test_subvolume_retain_snapshot_invalid_recreate(self):
- """
- ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
- """
+ @unittest.skip("skipping subvolumegroup snapshot tests")
+ def test_subvolume_group_snapshot_idempotence(self):
subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
- # remove with snapshot retention
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+ # snapshot group
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
- # recreate subvolume with an invalid pool
- data_pool = "invalid_pool"
+ # try creating snapshot w/ same snapshot name -- shoule be idempotent
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+
+ # remove snapshot
+ self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ @unittest.skip("skipping subvolumegroup snapshot tests")
+ def test_subvolume_group_snapshot_ls(self):
+ # tests the 'fs subvolumegroup snapshot ls' command
+
+ snapshots = []
+
+ # create group
+ group = self._generate_random_group_name()
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolumegroup snapshots
+ snapshots = self._generate_random_snapshot_name(3)
+ for snapshot in snapshots:
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
+
+ subvolgrpsnapshotls = json.loads(self._fs_cmd('subvolumegroup', 'snapshot', 'ls', self.volname, group))
+ if len(subvolgrpsnapshotls) == 0:
+ raise RuntimeError("Expected the 'fs subvolumegroup snapshot ls' command to list the created subvolume group snapshots")
+ else:
+ snapshotnames = [snapshot['name'] for snapshot in subvolgrpsnapshotls]
+ if collections.Counter(snapshotnames) != collections.Counter(snapshots):
+ raise RuntimeError("Error creating or listing subvolume group snapshots")
+
+ @unittest.skip("skipping subvolumegroup snapshot tests")
+ def test_subvolume_group_snapshot_rm_force(self):
+ # test removing non-existing subvolume group snapshot with --force
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+ # remove snapshot
try:
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+ self._fs_cmd("subvolumegroup", "snapshot", "rm", self.volname, group, snapshot, "--force")
+ except CommandFailedError:
+ raise RuntimeError("expected the 'fs subvolumegroup snapshot rm --force' command to succeed")
+
+ def test_subvolume_group_snapshot_unsupported_status(self):
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # snapshot group
+ try:
+ self._fs_cmd("subvolumegroup", "snapshot", "create", self.volname, group, snapshot)
except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+ self.assertEqual(ce.exitstatus, errno.ENOSYS, "invalid error code on subvolumegroup snapshot create")
else:
- self.fail("expected recreate of subvolume with invalid poolname to fail")
+ self.fail("expected subvolumegroup snapshot create command to fail")
- # fetch info
- subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
- self.assertEqual(subvol_info["state"], "snapshot-retained",
- msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # getpath
+
+class TestSubvolumeSnapshots(TestVolumesHelper):
+ """Tests for FS subvolume snapshot operations."""
+ def test_nonexistent_subvolume_snapshot_rm(self):
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove snapshot again
try:
- self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+ if ce.exitstatus != errno.ENOENT:
+ raise
else:
- self.fail("expected getpath of subvolume with retained snapshots to fail")
+ raise RuntimeError("expected the 'fs subvolume snapshot rm' command to fail")
- # remove snapshot (should remove volume)
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
- def test_subvolume_retain_snapshot_recreate_subvolume(self):
- """
- ensure a retained subvolume can be recreated and further snapshotted
- """
- snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
-
+ def test_subvolume_snapshot_create_and_rm(self):
subvolume = self._generate_random_subvolume_name()
- snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+ snapshot = self._generate_random_snapshot_name()
# create subvolume
self._fs_cmd("subvolume", "create", self.volname, subvolume)
# snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
-
- # remove with snapshot retention
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
- # fetch info
- subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
- self.assertEqual(subvol_info["state"], "snapshot-retained",
- msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
- # recreate retained subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
- # fetch info
- subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
- self.assertEqual(subvol_info["state"], "complete",
- msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
- # snapshot info (older snapshot)
- snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
- for md in snap_md:
- self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
- self.assertEqual(snap_info["has_pending_clones"], "no")
+ def test_subvolume_snapshot_create_idempotence(self):
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
- # snap-create (new snapshot)
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
- # remove with retain snapshots
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
- # list snapshots
- subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
- self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
- " created subvolume snapshots")
- snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
- for snap in [snapshot1, snapshot2]:
- self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+ # try creating w/ same subvolume snapshot name -- should be idempotent
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
- # remove snapshots (should remove volume)
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
- # verify list subvolumes returns an empty list
- subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
- self.assertEqual(len(subvolumels), 0)
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
- def test_subvolume_retain_snapshot_with_snapshots(self):
+ def test_subvolume_snapshot_info(self):
+
"""
- ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
- also test allowed and dis-allowed operations on a retained subvolume
+ tests the 'fs subvolume snapshot info' command
"""
+
snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
subvolume = self._generate_random_subvolume_name()
- snapshot = self._generate_random_snapshot_name()
+ snapshot, snap_missing = self._generate_random_snapshot_name(2)
# create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=1)
# snapshot subvolume
self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
- # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
- try:
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
- else:
- self.fail("expected rm of subvolume with retained snapshots to fail")
-
- # remove with snapshot retention
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
-
- # fetch info
- subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
- self.assertEqual(subvol_info["state"], "snapshot-retained",
- msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
-
- ## test allowed ops in retained state
- # ls
- subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
- self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
- self.assertEqual(subvolumes[0]['name'], subvolume,
- "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
-
- # snapshot info
snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
for md in snap_md:
self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
self.assertEqual(snap_info["has_pending_clones"], "no")
- # rm --force (allowed but should fail)
- try:
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
- else:
- self.fail("expected rm of subvolume with retained snapshots to fail")
-
- # rm (allowed but should fail)
- try:
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
- else:
- self.fail("expected rm of subvolume with retained snapshots to fail")
-
- ## test disallowed ops
- # getpath
- try:
- self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
- else:
- self.fail("expected getpath of subvolume with retained snapshots to fail")
-
- # resize
- nsize = self.DEFAULT_FILE_SIZE*1024*1024
- try:
- self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
- else:
- self.fail("expected resize of subvolume with retained snapshots to fail")
-
- # snap-create
+ # snapshot info for non-existent snapshot
try:
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+ self._get_subvolume_snapshot_info(self.volname, subvolume, snap_missing)
except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot info of non-existent snapshot")
else:
- self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+ self.fail("expected snapshot info of non-existent snapshot to fail")
- # remove snapshot (should remove volume)
+ # remove snapshot
self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
- # verify list subvolumes returns an empty list
- subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
- self.assertEqual(len(subvolumels), 0)
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
# verify trash dir is clean
self._wait_for_trash_empty()
- def test_subvolume_retain_snapshot_without_snapshots(self):
+ def test_subvolume_snapshot_in_group(self):
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+ # snapshot subvolume in group
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_subvolume_snapshot_ls(self):
+ # tests the 'fs subvolume snapshot ls' command
+
+ snapshots = []
+
+ # create subvolume
+ subvolume = self._generate_random_subvolume_name()
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # create subvolume snapshots
+ snapshots = self._generate_random_snapshot_name(3)
+ for snapshot in snapshots:
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+ if len(subvolsnapshotls) == 0:
+ self.fail("Expected the 'fs subvolume snapshot ls' command to list the created subvolume snapshots")
+ else:
+ snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+ if collections.Counter(snapshotnames) != collections.Counter(snapshots):
+ self.fail("Error creating or listing subvolume snapshots")
+
+ # remove snapshot
+ for snapshot in snapshots:
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_inherited_snapshot_ls(self):
+ # tests the scenario where 'fs subvolume snapshot ls' command
+ # should not list inherited snapshots created as part of snapshot
+ # at ancestral level
+
+ snapshots = []
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snap_count = 3
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+ # create subvolume snapshots
+ snapshots = self._generate_random_snapshot_name(snap_count)
+ for snapshot in snapshots:
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot, group)
+
+ # Create snapshot at ancestral level
+ ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_1")
+ ancestral_snappath2 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_2")
+ self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1, ancestral_snappath2], sudo=True)
+
+ subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume, group))
+ self.assertEqual(len(subvolsnapshotls), snap_count)
+
+ # remove ancestral snapshots
+ self.mount_a.run_shell(['rmdir', ancestral_snappath1, ancestral_snappath2], sudo=True)
+
+ # remove snapshot
+ for snapshot in snapshots:
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, group)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_subvolume_inherited_snapshot_info(self):
+ """
+ tests the scenario where 'fs subvolume snapshot info' command
+ should fail for inherited snapshots created as part of snapshot
+ at ancestral level
+ """
+
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+ # Create snapshot at ancestral level
+ ancestral_snap_name = "ancestral_snap_1"
+ ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
+ self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+
+ # Validate existence of inherited snapshot
+ group_path = os.path.join(".", "volumes", group)
+ inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
+ inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
+ inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
+ self.mount_a.run_shell(['ls', inherited_snappath])
+
+ # snapshot info on inherited snapshot
+ try:
+ self._get_subvolume_snapshot_info(self.volname, subvolume, inherited_snap, group)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on snapshot info of inherited snapshot")
+ else:
+ self.fail("expected snapshot info of inherited snapshot to fail")
+
+ # remove ancestral snapshots
+ self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_subvolume_inherited_snapshot_rm(self):
+ """
+ tests the scenario where 'fs subvolume snapshot rm' command
+ should fail for inherited snapshots created as part of snapshot
+ at ancestral level
+ """
+
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+ # Create snapshot at ancestral level
+ ancestral_snap_name = "ancestral_snap_1"
+ ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
+ self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+
+ # Validate existence of inherited snap
+ group_path = os.path.join(".", "volumes", group)
+ inode_number_group_dir = int(self.mount_a.run_shell(['stat', '-c' '%i', group_path]).stdout.getvalue().strip())
+ inherited_snap = "_{0}_{1}".format(ancestral_snap_name, inode_number_group_dir)
+ inherited_snappath = os.path.join(".", "volumes", group, subvolume,".snap", inherited_snap)
+ self.mount_a.run_shell(['ls', inherited_snappath])
+
+ # inherited snapshot should not be deletable
+ try:
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, inherited_snap, "--group_name", group)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when removing inherited snapshot")
+ else:
+ self.fail("expected removing inheirted snapshot to fail")
+
+ # remove ancestral snapshots
+ self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_subvolume_subvolumegroup_snapshot_name_conflict(self):
+ """
+ tests the scenario where creation of subvolume snapshot name
+ with same name as it's subvolumegroup snapshot name. This should
+ fail.
+ """
+
+ subvolume = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ group_snapshot = self._generate_random_snapshot_name()
+
+ # create group
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--group_name", group)
+
+ # Create subvolumegroup snapshot
+ group_snapshot_path = os.path.join(".", "volumes", group, ".snap", group_snapshot)
+ self.mount_a.run_shell(['mkdir', '-p', group_snapshot_path], sudo=True)
+
+ # Validate existence of subvolumegroup snapshot
+ self.mount_a.run_shell(['ls', group_snapshot_path])
+
+ # Creation of subvolume snapshot with it's subvolumegroup snapshot name should fail
+ try:
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, group_snapshot, "--group_name", group)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EINVAL, msg="invalid error code when creating subvolume snapshot with same name as subvolume group snapshot")
+ else:
+ self.fail("expected subvolume snapshot creation with same name as subvolumegroup snapshot to fail")
+
+ # remove subvolumegroup snapshot
+ self.mount_a.run_shell(['rmdir', group_snapshot_path], sudo=True)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_subvolume_retain_snapshot_invalid_recreate(self):
+ """
+ ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # recreate subvolume with an invalid pool
+ data_pool = "invalid_pool"
+ try:
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+ else:
+ self.fail("expected recreate of subvolume with invalid poolname to fail")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # getpath
+ try:
+ self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+ else:
+ self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+ # remove snapshot (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_recreate_subvolume(self):
+ """
+ ensure a retained subvolume can be recreated and further snapshotted
+ """
+ snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+ subvolume = self._generate_random_subvolume_name()
+ snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # recreate retained subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "complete",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # snapshot info (older snapshot)
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
+ for md in snap_md:
+ self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # snap-create (new snapshot)
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+
+ # remove with retain snapshots
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # list snapshots
+ subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+ self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
+ " created subvolume snapshots")
+ snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+ for snap in [snapshot1, snapshot2]:
+ self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+
+ # remove snapshots (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_with_snapshots(self):
+ """
+ ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
+ also test allowed and dis-allowed operations on a retained subvolume
+ """
+ snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ ## test allowed ops in retained state
+ # ls
+ subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
+ self.assertEqual(subvolumes[0]['name'], subvolume,
+ "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
+
+ # snapshot info
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+ for md in snap_md:
+ self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # rm --force (allowed but should fail)
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ # rm (allowed but should fail)
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ ## test disallowed ops
+ # getpath
+ try:
+ self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+ else:
+ self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+ # resize
+ nsize = self.DEFAULT_FILE_SIZE*1024*1024
+ try:
+ self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
+ else:
+ self.fail("expected resize of subvolume with retained snapshots to fail")
+
+ # snap-create
+ try:
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+ else:
+ self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+
+ # remove snapshot (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_without_snapshots(self):
+ """
+ ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+ """
+ subvolume = self._generate_random_subvolume_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # remove with snapshot retention (should remove volume, no snapshots to retain)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+ """
+ ensure retained subvolume recreate fails if its trash is not yet purged
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fake a trash entry
+ self._update_fake_trash(subvolume)
+
+ # recreate subvolume
+ try:
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+ else:
+ self.fail("expected recreate of subvolume with purge pending to fail")
+
+ # clear fake trash entry
+ self._update_fake_trash(subvolume, create=False)
+
+ # recreate subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_rm_with_snapshots(self):
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOTEMPTY:
+ raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+ else:
+ raise RuntimeError("expected subvolume deletion to fail")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_protect_unprotect_sanity(self):
+ """
+ Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
+ invoking the command does not cause errors, till they are removed from a subsequent release.
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=64)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # now, protect snapshot
+ self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
+
+ # schedule a clone
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone)
+
+ # now, unprotect snapshot
+ self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+
+ # verify clone
+ self._verify_clone(subvolume, snapshot, clone)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_rm_force(self):
+ # test removing non existing subvolume snapshot with --force
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # remove snapshot
+ try:
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, "--force")
+ except CommandFailedError:
+ raise RuntimeError("expected the 'fs subvolume snapshot rm --force' command to succeed")
+
+ def test_subvolume_snapshot_metadata_set(self):
+ """
+ Set custom metadata for subvolume snapshot.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_set_idempotence(self):
+ """
+ Set custom metadata for subvolume snapshot (Idempotency).
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed")
+
+ # set same metadata again for subvolume.
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata set' command to succeed because it is idempotent operation")
+
+ # get value for specified key.
+ try:
+ ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
+
+ # match received value with expected value.
+ self.assertEqual(value, ret)
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_get(self):
+ """
+ Get custom metadata for a specified key in subvolume snapshot metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # get value for specified key.
+ try:
+ ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
+
+ # match received value with expected value.
+ self.assertEqual(value, ret)
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_get_for_nonexisting_key(self):
+ """
+ Get custom metadata for subvolume snapshot if specified key not exist in metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # try to get value for nonexisting key
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, "key_nonexist", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because 'key_nonexist' does not exist")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_get_for_nonexisting_section(self):
+ """
+ Get custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # try to get value for nonexisting key (as section does not exist)
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, "key", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because section does not exist")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_update(self):
+ """
+ Update custom metadata for a specified key in subvolume snapshot metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # update metadata against key.
+ new_value = "new_value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, new_value, group)
+
+ # get metadata for specified key of snapshot.
+ try:
+ ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata get' command to succeed")
+
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
+
+ # match received value with expected value.
+ self.assertEqual(new_value, ret)
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_list(self):
+ """
+ List custom metadata for subvolume snapshot.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for subvolume.
+ input_metadata_dict = {f'key_{i}' : f'value_{i}' for i in range(3)}
+
+ for k, v in input_metadata_dict.items():
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, k, v, group)
+
+ # list metadata
+ try:
+ ret_dict = json.loads(self._fs_cmd("subvolume", "snapshot", "metadata", "ls", self.volname, subvolname, snapshot, group))
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata ls' command to succeed")
+
+ # compare output with expected output
+ self.assertDictEqual(input_metadata_dict, ret_dict)
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_list_if_no_metadata_set(self):
+ """
+ List custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # list metadata
+ try:
+ ret_dict = json.loads(self._fs_cmd("subvolume", "snapshot", "metadata", "ls", self.volname, subvolname, snapshot, group))
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata ls' command to succeed")
+
+ # compare output with expected output
+ empty_dict = {}
+ self.assertDictEqual(ret_dict, empty_dict)
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_remove(self):
+ """
+ Remove custom metadata for a specified key in subvolume snapshot metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # remove metadata against specified key.
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
+
+ # confirm key is removed by again fetching metadata
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, key, snapshot, group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because key does not exist")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_remove_for_nonexisting_key(self):
+ """
+ Remove custom metadata for subvolume snapshot if specified key not exist in metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
+
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # try to remove value for nonexisting key
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, "key_nonexist", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because 'key_nonexist' does not exist")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
+
+ def test_subvolume_snapshot_metadata_remove_for_nonexisting_section(self):
"""
- ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+ Remove custom metadata for subvolume snapshot if metadata is not added for subvolume snapshot.
"""
- subvolume = self._generate_random_subvolume_name()
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+ snapshot = self._generate_random_snapshot_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # remove with snapshot retention (should remove volume, no snapshots to retain)
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
- # verify list subvolumes returns an empty list
- subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
- self.assertEqual(len(subvolumels), 0)
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
- # verify trash dir is clean
+ # try to remove value for nonexisting key (as section does not exist)
+ # Expecting ENOENT exit status because key does not exist
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, "key", group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
+ else:
+ self.fail("Expected ENOENT because section does not exist")
+
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+ def test_subvolume_snapshot_metadata_remove_force(self):
"""
- ensure retained subvolume recreate fails if its trash is not yet purged
+ Forcefully remove custom metadata for a specified key in subvolume snapshot metadata.
"""
- subvolume = self._generate_random_subvolume_name()
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
# snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
- # remove with snapshot retention
- self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
- # fake a trash entry
- self._update_fake_trash(subvolume)
+ # remove metadata against specified key with --force option.
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group, "--force")
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
- # recreate subvolume
+ # confirm key is removed by again fetching metadata
try:
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
- except CommandFailedError as ce:
- self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+ self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
else:
- self.fail("expected recreate of subvolume with purge pending to fail")
-
- # clear fake trash entry
- self._update_fake_trash(subvolume, create=False)
-
- # recreate subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
-
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self.fail("Expected ENOENT because key does not exist")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_rm_with_snapshots(self):
- subvolume = self._generate_random_subvolume_name()
+ def test_subvolume_snapshot_metadata_remove_force_for_nonexisting_key(self):
+ """
+ Forcefully remove custom metadata for subvolume snapshot if specified key not exist in metadata.
+ """
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
+
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
# snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
- # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
+
+ # remove metadata against specified key.
try:
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
- except CommandFailedError as ce:
- if ce.exitstatus != errno.ENOTEMPTY:
- raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata rm' command to succeed")
+
+ # confirm key is removed by again fetching metadata
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
+ except CommandFailedError as e:
+ self.assertEqual(e.exitstatus, errno.ENOENT)
else:
- raise RuntimeError("expected subvolume deletion to fail")
+ self.fail("Expected ENOENT because key does not exist")
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # again remove metadata against already removed key with --force option.
+ try:
+ self._fs_cmd("subvolume", "snapshot", "metadata", "rm", self.volname, subvolname, snapshot, key, group, "--force")
+ except CommandFailedError:
+ self.fail("expected the 'fs subvolume snapshot metadata rm' (with --force) command to succeed")
- # remove subvolume
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
- # verify trash dir is clean
+ # verify trash dir is clean.
self._wait_for_trash_empty()
- def test_subvolume_snapshot_protect_unprotect_sanity(self):
+ def test_subvolume_snapshot_metadata_after_snapshot_remove(self):
"""
- Snapshot protect/unprotect commands are deprecated. This test exists to ensure that
- invoking the command does not cause errors, till they are removed from a subsequent release.
+ Verify metadata removal of subvolume snapshot after snapshot removal.
"""
- subvolume = self._generate_random_subvolume_name()
+ subvolname = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
snapshot = self._generate_random_snapshot_name()
- clone = self._generate_random_clone_name()
- # create subvolume
- self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+ # create group.
+ self._fs_cmd("subvolumegroup", "create", self.volname, group)
- # do some IO
- self._do_subvolume_io(subvolume, number_of_files=64)
+ # create subvolume in group.
+ self._fs_cmd("subvolume", "create", self.volname, subvolname, group)
# snapshot subvolume
- self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
-
- # now, protect snapshot
- self._fs_cmd("subvolume", "snapshot", "protect", self.volname, subvolume, snapshot)
-
- # schedule a clone
- self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolname, snapshot, group)
- # check clone status
- self._wait_for_clone_to_complete(clone)
+ # set metadata for snapshot.
+ key = "key"
+ value = "value"
+ self._fs_cmd("subvolume", "snapshot", "metadata", "set", self.volname, subvolname, snapshot, key, value, group)
- # now, unprotect snapshot
- self._fs_cmd("subvolume", "snapshot", "unprotect", self.volname, subvolume, snapshot)
+ # get value for specified key.
+ ret = self._fs_cmd("subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group)
- # verify clone
- self._verify_clone(subvolume, snapshot, clone)
+ # remove '\n' from returned value.
+ ret = ret.strip('\n')
- # remove snapshot
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ # match received value with expected value.
+ self.assertEqual(value, ret)
- # remove subvolumes
- self._fs_cmd("subvolume", "rm", self.volname, subvolume)
- self._fs_cmd("subvolume", "rm", self.volname, clone)
+ # remove subvolume snapshot.
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolname, snapshot, group)
- # verify trash dir is clean
- self._wait_for_trash_empty()
+ # try to get metadata after removing snapshot.
+ # Expecting error ENOENT with error message of snapshot does not exist
+ cmd_ret = self.mgr_cluster.mon_manager.run_cluster_cmd(
+ args=["fs", "subvolume", "snapshot", "metadata", "get", self.volname, subvolname, snapshot, key, group],
+ check_status=False, stdout=StringIO(), stderr=StringIO())
+ self.assertEqual(cmd_ret.returncode, errno.ENOENT, "Expecting ENOENT error")
+ self.assertIn(f"snapshot '{snapshot}' does not exist", cmd_ret.stderr.getvalue(),
+ f"Expecting message: snapshot '{snapshot}' does not exist ")
- def test_subvolume_snapshot_rm_force(self):
- # test removing non existing subvolume snapshot with --force
- subvolume = self._generate_random_subvolume_name()
- snapshot = self._generate_random_snapshot_name()
+ # confirm metadata is removed by searching section name in .meta file
+ meta_path = os.path.join(".", "volumes", group, subvolname, ".meta")
+ section_name = "SNAP_METADATA_" + snapshot
- # remove snapshot
try:
- self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot, "--force")
- except CommandFailedError:
- raise RuntimeError("expected the 'fs subvolume snapshot rm --force' command to succeed")
+ self.mount_a.run_shell(f"sudo grep {section_name} {meta_path}", omit_sudo=False)
+ except CommandFailedError as e:
+ self.assertNotEqual(e.exitstatus, 0)
+ else:
+ self.fail("Expected non-zero exist status because section should not exist")
+ self._fs_cmd("subvolume", "rm", self.volname, subvolname, group)
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ # verify trash dir is clean.
+ self._wait_for_trash_empty()
class TestSubvolumeSnapshotClones(TestVolumesHelper):
""" Tests for FS subvolume snapshot clone operations."""
# verify trash dir is clean
self._wait_for_trash_empty()
+ def test_clone_failure_status_pending_in_progress_complete(self):
+ """
+ ensure failure status is not shown when clone is not in failed/cancelled state
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone1 = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=200)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clone1
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+ # pending clone shouldn't show failure status
+ clone1_result = self._get_clone_status(clone1)
+ try:
+ clone1_result["status"]["failure"]["errno"]
+ except KeyError as e:
+ self.assertEqual(str(e), "'failure'")
+ else:
+ self.fail("clone status shouldn't show failure for pending clone")
+
+ # check clone1 to be in-progress
+ self._wait_for_clone_to_be_in_progress(clone1)
+
+ # in-progress clone1 shouldn't show failure status
+ clone1_result = self._get_clone_status(clone1)
+ try:
+ clone1_result["status"]["failure"]["errno"]
+ except KeyError as e:
+ self.assertEqual(str(e), "'failure'")
+ else:
+ self.fail("clone status shouldn't show failure for in-progress clone")
+
+ # wait for clone1 to complete
+ self._wait_for_clone_to_complete(clone1)
+
+ # complete clone1 shouldn't show failure status
+ clone1_result = self._get_clone_status(clone1)
+ try:
+ clone1_result["status"]["failure"]["errno"]
+ except KeyError as e:
+ self.assertEqual(str(e), "'failure'")
+ else:
+ self.fail("clone status shouldn't show failure for complete clone")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, clone1)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_clone_failure_status_failed(self):
+ """
+ ensure failure status is shown when clone is in failed state and validate the reason
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone1 = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=200)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clone1
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+ # remove snapshot from backend to force the clone failure.
+ snappath = os.path.join(".", "volumes", "_nogroup", subvolume, ".snap", snapshot)
+ self.mount_a.run_shell(['rmdir', snappath], sudo=True)
+
+ # wait for clone1 to fail.
+ self._wait_for_clone_to_fail(clone1)
+
+ # check clone1 status
+ clone1_result = self._get_clone_status(clone1)
+ self.assertEqual(clone1_result["status"]["state"], "failed")
+ self.assertEqual(clone1_result["status"]["failure"]["errno"], "2")
+ self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "snapshot '{0}' does not exist".format(snapshot))
+
+ # clone removal should succeed after failure, remove clone1
+ self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_clone_failure_status_pending_cancelled(self):
+ """
+ ensure failure status is shown when clone is cancelled during pending state and validate the reason
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone1 = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=200)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clone1
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+ # cancel pending clone1
+ self._fs_cmd("clone", "cancel", self.volname, clone1)
+
+ # check clone1 status
+ clone1_result = self._get_clone_status(clone1)
+ self.assertEqual(clone1_result["status"]["state"], "canceled")
+ self.assertEqual(clone1_result["status"]["failure"]["errno"], "4")
+ self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "user interrupted clone operation")
+
+ # clone removal should succeed with force after cancelled, remove clone1
+ self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_clone_failure_status_in_progress_cancelled(self):
+ """
+ ensure failure status is shown when clone is cancelled during in-progress state and validate the reason
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone1 = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--mode=777")
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=200)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # Insert delay at the beginning of snapshot clone
+ self.config_set('mgr', 'mgr/volumes/snapshot_clone_delay', 5)
+
+ # schedule a clone1
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone1)
+
+ # wait for clone1 to be in-progress
+ self._wait_for_clone_to_be_in_progress(clone1)
+
+ # cancel in-progess clone1
+ self._fs_cmd("clone", "cancel", self.volname, clone1)
+
+ # check clone1 status
+ clone1_result = self._get_clone_status(clone1)
+ self.assertEqual(clone1_result["status"]["state"], "canceled")
+ self.assertEqual(clone1_result["status"]["failure"]["errno"], "4")
+ self.assertEqual(clone1_result["status"]["failure"]["error_msg"], "user interrupted clone operation")
+
+ # clone removal should succeed with force after cancelled, remove clone1
+ self._fs_cmd("subvolume", "rm", self.volname, clone1, "--force")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolumes
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
def test_subvolume_snapshot_clone(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()