# verify trash dir is clean
self._wait_for_trash_empty()
+
+ def test_malicious_metafile_on_legacy_to_v1_upgrade(self):
+ """
+ Validate handcrafted .meta file on legacy subvol root doesn't break the system
+ on legacy subvol upgrade to v1
+ poor man's upgrade test -- theme continues...
+ """
+ subvol1, subvol2 = self._generate_random_subvolume_name(2)
+ group = self._generate_random_group_name()
+
+ # emulate a old-fashioned subvolume in the default group
+ createpath1 = os.path.join(".", "volumes", "_nogroup", subvol1)
+ self.mount_a.run_shell(['mkdir', '-p', createpath1], sudo=True)
+
+ # add required xattrs to subvolume
+ default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+ self.mount_a.setfattr(createpath1, 'ceph.dir.layout.pool', default_pool, sudo=True)
+
+ # create v2 subvolume
+ self._fs_cmd("subvolume", "create", self.volname, subvol2)
+
+ # Create malicious .meta file in legacy subvolume root. Copy v2 subvolume
+ # .meta into legacy subvol1's root
+ subvol2_metapath = os.path.join(".", "volumes", "_nogroup", subvol2, ".meta")
+ self.mount_a.run_shell(["cp", subvol2_metapath, createpath1], sudo=True)
+
+ # Upgrade legacy subvol1 to v1
+ subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvol1)
+ self.assertNotEqual(subvolpath1, None)
+ subvolpath1 = subvolpath1.rstrip()
+
+ # the subvolume path returned should not be of subvol2 from handcrafted
+ # .meta file
+ self.assertEqual(createpath1[1:], subvolpath1)
+
+ # ensure metadata file is in legacy location, with required version v1
+ self._assert_meta_location_and_version(self.volname, subvol1, version=1, legacy=True)
+
+ # Authorize alice authID read-write access to subvol1. Verify it authorizes subvol1 path and not subvol2
+ # path whose '.meta' file is copied to subvol1 root
+ authid1 = "alice"
+ self._fs_cmd("subvolume", "authorize", self.volname, subvol1, authid1)
+
+ # Validate that the mds path added is of subvol1 and not of subvol2
+ out = json.loads(self.fs.mon_manager.raw_cluster_cmd("auth", "get", "client.alice", "--format=json-pretty"))
+ self.assertEqual("client.alice", out[0]["entity"])
+ self.assertEqual("allow rw path={0}".format(createpath1[1:]), out[0]["caps"]["mds"])
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvol1)
+ self._fs_cmd("subvolume", "rm", self.volname, subvol2)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ def test_binary_metafile_on_legacy_to_v1_upgrade(self):
+ """
+ Validate binary .meta file on legacy subvol root doesn't break the system
+ on legacy subvol upgrade to v1
+ poor man's upgrade test -- theme continues...
+ """
+ subvol = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # emulate a old-fashioned subvolume -- in a custom group
+ createpath = os.path.join(".", "volumes", group, subvol)
+ self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+ # add required xattrs to subvolume
+ default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+ self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool, sudo=True)
+
+ # Create unparseable binary .meta file on legacy subvol's root
+ meta_contents = os.urandom(4096)
+ meta_filepath = os.path.join(self.mount_a.mountpoint, createpath, ".meta")
+ self.mount_a.client_remote.write_file(meta_filepath, meta_contents, sudo=True)
+
+ # Upgrade legacy subvol to v1
+ subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvol, group)
+ self.assertNotEqual(subvolpath, None)
+ subvolpath = subvolpath.rstrip()
+
+ # The legacy subvolume path should be returned for subvol.
+ # Should ignore unparseable binary .meta file in subvol's root
+ self.assertEqual(createpath[1:], subvolpath)
+
+ # ensure metadata file is in legacy location, with required version v1
+ self._assert_meta_location_and_version(self.volname, subvol, subvol_group=group, version=1, legacy=True)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvol, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
+
+ def test_unparseable_metafile_on_legacy_to_v1_upgrade(self):
+ """
+ Validate unparseable text .meta file on legacy subvol root doesn't break the system
+ on legacy subvol upgrade to v1
+ poor man's upgrade test -- theme continues...
+ """
+ subvol = self._generate_random_subvolume_name()
+ group = self._generate_random_group_name()
+
+ # emulate a old-fashioned subvolume -- in a custom group
+ createpath = os.path.join(".", "volumes", group, subvol)
+ self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+
+ # add required xattrs to subvolume
+ default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
+ self.mount_a.setfattr(createpath, 'ceph.dir.layout.pool', default_pool, sudo=True)
+
+ # Create unparseable text .meta file on legacy subvol's root
+ meta_contents = "unparseable config\nfile ...\nunparseable config\nfile ...\n"
+ meta_filepath = os.path.join(self.mount_a.mountpoint, createpath, ".meta")
+ self.mount_a.client_remote.write_file(meta_filepath, meta_contents, sudo=True)
+
+ # Upgrade legacy subvol to v1
+ subvolpath = self._fs_cmd("subvolume", "getpath", self.volname, subvol, group)
+ self.assertNotEqual(subvolpath, None)
+ subvolpath = subvolpath.rstrip()
+
+ # The legacy subvolume path should be returned for subvol.
+ # Should ignore unparseable binary .meta file in subvol's root
+ self.assertEqual(createpath[1:], subvolpath)
+
+ # ensure metadata file is in legacy location, with required version v1
+ self._assert_meta_location_and_version(self.volname, subvol, subvol_group=group, version=1, legacy=True)
+
+ # remove subvolume
+ self._fs_cmd("subvolume", "rm", self.volname, subvol, group)
+
+ # verify trash dir is clean
+ self._wait_for_trash_empty()
+
+ # remove group
+ self._fs_cmd("subvolumegroup", "rm", self.volname, group)
import logging
from hashlib import md5
from typing import Dict, Union
+from pathlib import Path
import cephfs
from ...fs_util import get_ancestor_xattr
from ...exception import MetadataMgrException, VolumeException
from .auth_metadata import AuthMetadataManager
+from .subvolume_attrs import SubvolumeStates
log = logging.getLogger(__name__)
@property
def state(self):
""" Subvolume state, one of SubvolumeStates """
- raise NotImplementedError
+ return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
@property
def subvol_type(self):
raise NotImplementedError
def load_config(self):
+ try:
+ self.fs.stat(self.legacy_config_path)
+ self.legacy_mode = True
+ except cephfs.Error as e:
+ pass
+
+ log.debug("loading config "
+ "'{0}' [mode: {1}]".format(self.subvolname, "legacy"
+ if self.legacy_mode else "new"))
if self.legacy_mode:
self.metadata_mgr = MetadataManager(self.fs,
self.legacy_config_path,
self.fs.stat(self.base_path)
self.metadata_mgr.refresh()
log.debug("loaded subvolume '{0}'".format(self.subvolname))
+ subvolpath = self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH)
+ # subvolume with retained snapshots has empty path, don't mistake it for
+ # fabricated metadata.
+ if (not self.legacy_mode and self.state != SubvolumeStates.STATE_RETAINED and
+ self.base_path.decode('utf-8') != str(Path(subvolpath).parent)):
+ raise MetadataMgrException(-errno.ENOENT, 'fabricated .meta')
except MetadataMgrException as me:
- if me.errno == -errno.ENOENT and not self.legacy_mode:
+ if me.errno in (-errno.ENOENT, -errno.EINVAL) and not self.legacy_mode:
+ log.warn("subvolume '{0}', {1}, "
+ "assuming legacy_mode".format(self.subvolname, me.error_str))
self.legacy_mode = True
self.load_config()
self.discover()