from ceph.deployment.service_spec import NFSServiceSpec
from nfs import Module
-from nfs.export import ExportMgr
-from nfs.export_utils import GaneshaConfParser, Export, RawBlock
+from nfs.export import ExportMgr, normalize_path
+from nfs.ganesha_conf import GaneshaConfParser, Export, RawBlock
from nfs.cluster import NFSCluster
from orchestrator import ServiceDescription, DaemonDescription, OrchResult
mock.patch('nfs.cluster.restart_nfs_service'), \
mock.patch.object(MgrModule, 'tool_exec', mock_exec), \
mock.patch('nfs.export.check_fs', return_value=True), \
- mock.patch('nfs.export_utils.check_fs', return_value=True), \
+ mock.patch('nfs.ganesha_conf.check_fs', return_value=True), \
mock.patch('nfs.export.ExportMgr._create_user_key',
return_value='thekeyforclientabc'):
blocks = GaneshaConfParser(block).parse()
export = Export.from_export_block(blocks[0], self.cluster_id)
nfs_mod = Module('nfs', '', '')
- with mock.patch('nfs.export_utils.check_fs', return_value=True):
+ with mock.patch('nfs.ganesha_conf.check_fs', return_value=True):
export.validate(nfs_mod)
def test_update_export(self):
def test_cluster_config(self):
self._do_mock_test(self._do_test_cluster_config)
+
+
+@pytest.mark.parametrize(
+ "path,expected",
+ [
+ ("/foo/bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/", "/foo/bar/baz"),
+ ("/foo/bar/baz ", "/foo/bar/baz"),
+ ("/foo/./bar/baz", "/foo/bar/baz"),
+ ("/foo/bar/baz/..", "/foo/bar"),
+ ("//foo/bar/baz", "/foo/bar/baz"),
+ ("", ""),
+ ]
+)
+def test_normalize_path(path, expected):
+ assert normalize_path(path) == expected
+
+
+def test_ganesha_validate_squash():
+ """Check error handling of internal validation function for squash value."""
+ from nfs.ganesha_conf import _validate_squash
+ from nfs.exception import NFSInvalidOperation
+
+ _validate_squash("root")
+ with pytest.raises(NFSInvalidOperation):
+ _validate_squash("toot")
+
+
+def test_ganesha_validate_access_type():
+ """Check error handling of internal validation function for access type value."""
+ from nfs.ganesha_conf import _validate_access_type
+ from nfs.exception import NFSInvalidOperation
+
+ for ok in ("rw", "ro", "none"):
+ _validate_access_type(ok)
+ with pytest.raises(NFSInvalidOperation):
+ _validate_access_type("any")