+ # clone (older snapshot)
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, 'fake', clone2)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone2)
+
+ # ensure clone is v2
+ self._assert_meta_location_and_version(self.volname, clone2, version=2)
+
+ # verify clone
+ # TODO: rentries will mismatch till this is fixed https://tracker.ceph.com/issues/46747
+ #self._verify_clone(subvolume, 'fake', clone2, source_version=1)
+
+ # snap-info
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+ for md in snap_md:
+ self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # snap-ls
+ subvol_snapshots = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+ self.assertEqual(len(subvol_snapshots), 2, "subvolume ls count mismatch, expected 2', found {0}".format(len(subvol_snapshots)))
+ snapshotnames = [snapshot['name'] for snapshot in subvol_snapshots]
+ for name in [snapshot, 'fake']:
+ self.assertIn(name, snapshotnames, msg="expected snapshot '{0}' in subvolume snapshot ls".format(name))
+
+ # snap-rm
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, "fake")
+
+ # ensure volume is still at version 1
+ self._assert_meta_location_and_version(self.volname, subvolume, version=1)
+
+ # rm
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, clone1)
+ self._fs_cmd("subvolume", "rm", self.volname, clone2)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_no_upgrade_v1_to_v2(self):
+ """
+ poor man's upgrade test -- theme continues...
+ ensure v1 to v2 upgrades are not done automatically due to various states of v1
+ """
+ subvolume1, subvolume2, subvolume3 = self._generate_random_subvolume_name(3)
+ group = self._generate_random_group_name()
+
+ # emulate a v1 subvolume -- in the default group
+ subvol1_path = self._create_v1_subvolume(subvolume1)
+
+ # emulate a v1 subvolume -- in a custom group
+ subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group)
+
+ # emulate a v1 subvolume -- in a clone pending state
+ self._create_v1_subvolume(subvolume3, subvol_type='clone', has_snapshot=False, state='pending')
+
+ # this would attempt auto-upgrade on access, but fail to do so as snapshots exist
+ subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+ self.assertEqual(subvolpath1, subvol1_path)
+
+ subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+ self.assertEqual(subvolpath2, subvol2_path)
+
+ # this would attempt auto-upgrade on access, but fail to do so as volume is not complete
+ # use clone status, as only certain operations are allowed in pending state
+ status = json.loads(self._fs_cmd("clone", "status", self.volname, subvolume3))
+ self.assertEqual(status["status"]["state"], "pending")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume1, "fake")
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume2, "fake", group)
+
+ # ensure metadata file is in v1 location, with version retained as v1
+ self._assert_meta_location_and_version(self.volname, subvolume1, version=1)
+ self._assert_meta_location_and_version(self.volname, subvolume2, subvol_group=group, version=1)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume2, group)
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume3)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on rm of subvolume undergoing clone")
+ else:
+ self.fail("expected rm of subvolume undergoing clone to fail")
+
+ # ensure metadata file is in v1 location, with version retained as v1
+ self._assert_meta_location_and_version(self.volname, subvolume3, version=1)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume3, "--force")
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_upgrade_v1_to_v2(self):
+ """
+ poor man's upgrade test -- theme continues...
+ ensure v1 to v2 upgrades work
+ """
+ subvolume1, subvolume2 = self._generate_random_subvolume_name(2)
+ group = self._generate_random_group_name()
+
+ # emulate a v1 subvolume -- in the default group
+ subvol1_path = self._create_v1_subvolume(subvolume1, has_snapshot=False)
+
+ # emulate a v1 subvolume -- in a custom group
+ subvol2_path = self._create_v1_subvolume(subvolume2, subvol_group=group, has_snapshot=False)
+
+ # this would attempt auto-upgrade on access
+ subvolpath1 = self._get_subvolume_path(self.volname, subvolume1)
+ self.assertEqual(subvolpath1, subvol1_path)
+
+ subvolpath2 = self._get_subvolume_path(self.volname, subvolume2, group_name=group)
+ self.assertEqual(subvolpath2, subvol2_path)
+
+ # ensure metadata file is in v2 location, with version retained as v2
+ self._assert_meta_location_and_version(self.volname, subvolume1, version=2)
+ self._assert_meta_location_and_version(self.volname, subvolume2, subvol_group=group, version=2)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume1)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume2, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_rm_with_snapshots(self):
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ if ce.exitstatus != errno.ENOTEMPTY:
+ raise RuntimeError("invalid error code returned when deleting subvolume with snapshots")
+ else:
+ raise RuntimeError("expected subvolume deletion to fail")
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_without_snapshots(self):
+ """
+ ensure retain snapshots based delete of a subvolume with no snapshots, deletes the subbvolume
+ """
+ subvolume = self._generate_random_subvolume_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # remove with snapshot retention (should remove volume, no snapshots to retain)
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_with_snapshots(self):
+ """
+ ensure retain snapshots based delete of a subvolume with snapshots retains the subvolume
+ also test allowed and dis-allowed operations on a retained subvolume
+ """
+ snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove subvolume -- should fail with ENOTEMPTY since it has snapshots
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of retained subvolume with snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ ## test allowed ops in retained state
+ # ls
+ subvolumes = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumes), 1, "subvolume ls count mismatch, expected '1', found {0}".format(len(subvolumes)))
+ self.assertEqual(subvolumes[0]['name'], subvolume,
+ "subvolume name mismatch in ls output, expected '{0}', found '{1}'".format(subvolume, subvolumes[0]['name']))
+
+ # snapshot info
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot))
+ for md in snap_md:
+ self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # rm --force (allowed but should fail)
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--force")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ # rm (allowed but should fail)
+ try:
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOTEMPTY, "invalid error code on rm of subvolume with retained snapshots")
+ else:
+ self.fail("expected rm of subvolume with retained snapshots to fail")
+
+ ## test disallowed ops
+ # getpath
+ try:
+ self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+ else:
+ self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+ # resize
+ nsize = self.DEFAULT_FILE_SIZE*1024*1024
+ try:
+ self._fs_cmd("subvolume", "resize", self.volname, subvolume, str(nsize))
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on resize of subvolume with retained snapshots")
+ else:
+ self.fail("expected resize of subvolume with retained snapshots to fail")
+
+ # snap-create
+ try:
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, "fail")
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on snapshot create of subvolume with retained snapshots")
+ else:
+ self.fail("expected snapshot create of subvolume with retained snapshots to fail")
+
+ # remove snapshot (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_invalid_recreate(self):
+ """
+ ensure retained subvolume recreate does not leave any incarnations in the subvolume and trash
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # recreate subvolume with an invalid pool
+ data_pool = "invalid_pool"
+ try:
+ self._fs_cmd("subvolume", "create", self.volname, subvolume, "--pool_layout", data_pool)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EINVAL, "invalid error code on recreate of subvolume with invalid poolname")
+ else:
+ self.fail("expected recreate of subvolume with invalid poolname to fail")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # getpath
+ try:
+ self._fs_cmd("subvolume", "getpath", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.ENOENT, "invalid error code on getpath of subvolume with retained snapshots")
+ else:
+ self.fail("expected getpath of subvolume with retained snapshots to fail")
+
+ # remove snapshot (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_trash_busy_recreate(self):
+ """
+ ensure retained subvolume recreate fails if its trash is not yet purged
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fake a trash entry
+ self._update_fake_trash(subvolume)
+
+ # recreate subvolume
+ try:
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of subvolume with purge pending")
+ else:
+ self.fail("expected recreate of subvolume with purge pending to fail")
+
+ # clear fake trash entry
+ self._update_fake_trash(subvolume, create=False)
+
+ # recreate subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_trash_busy_recreate_clone(self):
+ """
+ ensure retained clone recreate fails if its trash is not yet purged
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # clone subvolume snapshot
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone)
+
+ # snapshot clone
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, clone, snapshot)
+
+ # remove clone with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, clone, "--retain-snapshots")
+
+ # fake a trash entry
+ self._update_fake_trash(clone)
+
+ # clone subvolume snapshot (recreate)
+ try:
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+ except CommandFailedError as ce:
+ self.assertEqual(ce.exitstatus, errno.EAGAIN, "invalid error code on recreate of clone with purge pending")
+ else:
+ self.fail("expected recreate of clone with purge pending to fail")
+
+ # clear fake trash entry
+ self._update_fake_trash(clone, create=False)
+
+ # recreate subvolume
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone)
+
+ # remove snapshot
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, clone, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume)
+ self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_recreate_subvolume(self):
+ """
+ ensure a retained subvolume can be recreated and further snapshotted
+ """
+ snap_md = ["created_at", "data_pool", "has_pending_clones", "size"]
+
+ subvolume = self._generate_random_subvolume_name()
+ snapshot1, snapshot2 = self._generate_random_snapshot_name(2)
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot1)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "snapshot-retained",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # recreate retained subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # fetch info
+ subvol_info = json.loads(self._fs_cmd("subvolume", "info", self.volname, subvolume))
+ self.assertEqual(subvol_info["state"], "complete",
+ msg="expected state to be 'snapshot-retained', found '{0}".format(subvol_info["state"]))
+
+ # snapshot info (older snapshot)
+ snap_info = json.loads(self._get_subvolume_snapshot_info(self.volname, subvolume, snapshot1))
+ for md in snap_md:
+ self.assertIn(md, snap_info, "'{0}' key not present in metadata of snapshot".format(md))
+ self.assertEqual(snap_info["has_pending_clones"], "no")
+
+ # snap-create (new snapshot)
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot2)
+
+ # remove with retain snapshots
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # list snapshots
+ subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume))
+ self.assertEqual(len(subvolsnapshotls), 2, "Expected the 'fs subvolume snapshot ls' command to list the"
+ " created subvolume snapshots")
+ snapshotnames = [snapshot['name'] for snapshot in subvolsnapshotls]
+ for snap in [snapshot1, snapshot2]:
+ self.assertIn(snap, snapshotnames, "Missing snapshot '{0}' in snapshot list".format(snap))
+
+ # remove snapshots (should remove volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot1)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot2)
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_subvolume_retain_snapshot_clone(self):
+ """
+ clone a snapshot from a snapshot retained subvolume
+ """
+ subvolume = self._generate_random_subvolume_name()
+ snapshot = self._generate_random_snapshot_name()
+ clone = self._generate_random_clone_name()
+
+ # create subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+
+ # store path for clone verification
+ subvol_path = self._get_subvolume_path(self.volname, subvolume)
+
+ # do some IO
+ self._do_subvolume_io(subvolume, number_of_files=16)
+
+ # snapshot subvolume
+ self._fs_cmd("subvolume", "snapshot", "create", self.volname, subvolume, snapshot)
+
+ # remove with snapshot retention
+ self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--retain-snapshots")
+
+ # clone retained subvolume snapshot
+ self._fs_cmd("subvolume", "snapshot", "clone", self.volname, subvolume, snapshot, clone)
+
+ # check clone status
+ self._wait_for_clone_to_complete(clone)
+
+ # verify clone
+ self._verify_clone(subvolume, snapshot, clone, subvol_path=subvol_path)
+
+ # remove snapshots (removes retained volume)
+ self._fs_cmd("subvolume", "snapshot", "rm", self.volname, subvolume, snapshot)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, clone)
+
+ # verify list subvolumes returns an empty list
+ subvolumels = json.loads(self._fs_cmd('subvolume', 'ls', self.volname))
+ self.assertEqual(len(subvolumels), 0)