]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/tests/test_nfs.py
4 from typing
import Optional
, Tuple
, Iterator
, List
, Any
6 from contextlib
import contextmanager
7 from unittest
import mock
8 from unittest
.mock
import MagicMock
9 from mgr_module
import MgrModule
, NFS_POOL_NAME
11 from rados
import ObjectNotFound
13 from ceph
.deployment
.service_spec
import NFSServiceSpec
14 from nfs
import Module
15 from nfs
.export
import ExportMgr
, normalize_path
16 from nfs
.ganesha_conf
import GaneshaConfParser
, Export
, RawBlock
17 from nfs
.cluster
import NFSCluster
18 from orchestrator
import ServiceDescription
, DaemonDescription
, OrchResult
31 Attr_Expiration_Time = 0;
38 # Secret_Access_Key = "YOUR SECRET KEY HERE";
43 Clients = 192.168.0.10, 192.168.1.0/8;
49 Clients = 192.168.0.0/16;
63 squash = AllAnonymous;
65 Transports = TCP, UDP;
69 User_Id = "nfs.foo.bucket";
70 Access_Key_Id ="the_access_key";
71 Secret_Access_Key = "the_secret_key";
79 user_id = "nfs.foo.1";
81 secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
88 attr_expiration_time = 0;
89 security_label = true;
98 user_id = "nfs.foo.1";
100 secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
106 squash = "no_root_squash";
107 SecType = "krb5p", "krb5i";
108 attr_expiration_time = 0;
109 security_label = true;
116 %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-1"
118 %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
120 class RObject(object):
121 def __init__(self
, key
: str, raw
: str) -> None:
125 def read(self
, _
: Optional
[int]) -> bytes
:
126 return self
.raw
.encode('utf-8')
128 def stat(self
) -> Tuple
[int, None]:
129 return len(self
.raw
), None
131 def _ioctx_write_full_mock(self
, key
: str, content
: bytes
) -> None:
132 if key
not in self
.temp_store
[self
.temp_store_namespace
]:
133 self
.temp_store
[self
.temp_store_namespace
][key
] = \
134 TestNFS
.RObject(key
, content
.decode('utf-8'))
136 self
.temp_store
[self
.temp_store_namespace
][key
].raw
= content
.decode('utf-8')
138 def _ioctx_remove_mock(self
, key
: str) -> None:
139 del self
.temp_store
[self
.temp_store_namespace
][key
]
141 def _ioctx_list_objects_mock(self
) -> List
['TestNFS.RObject']:
142 r
= [obj
for _
, obj
in self
.temp_store
[self
.temp_store_namespace
].items()]
145 def _ioctl_stat_mock(self
, key
):
146 return self
.temp_store
[self
.temp_store_namespace
][key
].stat()
148 def _ioctl_read_mock(self
, key
: str, size
: Optional
[Any
] = None) -> bytes
:
149 if key
not in self
.temp_store
[self
.temp_store_namespace
]:
151 return self
.temp_store
[self
.temp_store_namespace
][key
].read(size
)
153 def _ioctx_set_namespace_mock(self
, namespace
: str) -> None:
154 self
.temp_store_namespace
= namespace
156 def _reset_temp_store(self
) -> None:
157 self
.temp_store_namespace
= None
160 'export-1': TestNFS
.RObject("export-1", self
.export_1
),
161 'export-2': TestNFS
.RObject("export-2", self
.export_2
),
162 'conf-nfs.foo': TestNFS
.RObject("conf-nfs.foo", self
.conf_nfs_foo
)
167 def _mock_orchestrator(self
, enable
: bool) -> Iterator
:
168 self
.io_mock
= MagicMock()
169 self
.io_mock
.set_namespace
.side_effect
= self
._ioctx
_set
_namespace
_mock
170 self
.io_mock
.read
= self
._ioctl
_read
_mock
171 self
.io_mock
.stat
= self
._ioctl
_stat
_mock
172 self
.io_mock
.list_objects
.side_effect
= self
._ioctx
_list
_objects
_mock
173 self
.io_mock
.write_full
.side_effect
= self
._ioctx
_write
_full
_mock
174 self
.io_mock
.remove_object
.side_effect
= self
._ioctx
_remove
_mock
177 orch_nfs_services
= [
178 ServiceDescription(spec
=NFSServiceSpec(service_id
=self
.cluster_id
))
182 DaemonDescription('nfs', 'foo.mydaemon', 'myhostname')
185 def mock_exec(cls
, args
):
186 if args
[1:3] == ['bucket', 'stats']:
188 "owner": "bucket_owner_user",
190 return 0, json
.dumps(bucket_info
), ''
193 "display_name": "foo",
201 "access_key": "the_access_key",
202 "secret_key": "the_secret_key"
207 "op_mask": "read, write, delete",
208 "default_placement": "",
209 "default_storage_class": "",
210 "placement_tags": [],
213 "check_on_raw": False,
220 "check_on_raw": False,
229 if args
[2] == 'list':
230 return 0, json
.dumps([u
]), ''
231 return 0, json
.dumps(u
), ''
233 def mock_describe_service(cls
, *args
, **kwargs
):
234 if kwargs
['service_type'] == 'nfs':
235 return OrchResult(orch_nfs_services
)
236 return OrchResult([])
238 def mock_list_daemons(cls
, *args
, **kwargs
):
239 if kwargs
['daemon_type'] == 'nfs':
240 return OrchResult(orch_nfs_daemons
)
241 return OrchResult([])
243 with mock
.patch('nfs.module.Module.describe_service', mock_describe_service
) as describe_service
, \
244 mock
.patch('nfs.module.Module.list_daemons', mock_list_daemons
) as list_daemons
, \
245 mock
.patch('nfs.module.Module.rados') as rados
, \
246 mock
.patch('nfs.export.available_clusters',
247 return_value
=[self
.cluster_id
]), \
248 mock
.patch('nfs.export.restart_nfs_service'), \
249 mock
.patch('nfs.cluster.restart_nfs_service'), \
250 mock
.patch
.object(MgrModule
, 'tool_exec', mock_exec
), \
251 mock
.patch('nfs.export.check_fs', return_value
=True), \
252 mock
.patch('nfs.ganesha_conf.check_fs', return_value
=True), \
253 mock
.patch('nfs.export.ExportMgr._create_user_key',
254 return_value
='thekeyforclientabc'), \
255 mock
.patch('nfs.export.cephfs_path_is_dir'):
257 rados
.open_ioctx
.return_value
.__enter
__.return_value
= self
.io_mock
258 rados
.open_ioctx
.return_value
.__exit
__ = mock
.Mock(return_value
=None)
260 self
._reset
_temp
_store
()
264 def test_parse_daemon_raw_config(self
) -> None:
265 expected_daemon_config
= [
266 RawBlock('NFS_CORE_PARAM', values
={
268 "enable_rquota": False,
272 RawBlock('MDCACHE', values
={
275 RawBlock('NFSV4', values
={
276 "recoverybackend": "rados_cluster",
277 "minor_versions": [1, 2]
279 RawBlock('RADOS_KV', values
={
280 "pool": NFS_POOL_NAME
,
281 "namespace": "vstart",
285 RawBlock('RADOS_URLS', values
={
287 "watch_url": f
"'rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart'"
289 RawBlock('%url', values
={
290 "value": f
"rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart"
293 daemon_raw_config
= """
296 Enable_RQUOTA = false;
306 RecoveryBackend = rados_cluster;
307 Minor_Versions = 1, 2;
319 watch_url = 'rados://{}/vstart/conf-nfs.vstart';
322 %url rados://{}/vstart/conf-nfs.vstart
323 """.replace('{}', NFS_POOL_NAME
)
324 daemon_config
= GaneshaConfParser(daemon_raw_config
).parse()
325 assert daemon_config
== expected_daemon_config
327 def _validate_export_1(self
, export
: Export
):
328 assert export
.export_id
== 1
329 assert export
.path
== "/"
330 assert export
.pseudo
== "/cephfs_a/"
331 assert export
.access_type
== "RW"
332 # assert export.squash == "root_squash" # probably correct value
333 assert export
.squash
== "no_root_squash"
334 assert export
.protocols
== [4]
335 # assert export.transports == {"TCP", "UDP"}
336 assert export
.fsal
.name
== "CEPH"
337 assert export
.fsal
.user_id
== "ganesha"
338 assert export
.fsal
.fs_name
== "a"
339 assert export
.fsal
.sec_label_xattr
== None
340 assert len(export
.clients
) == 2
341 assert export
.clients
[0].addresses
== \
342 ["192.168.0.10", "192.168.1.0/8"]
343 # assert export.clients[0].squash == "no_root_squash" # probably correct value
344 assert export
.clients
[0].squash
== "None"
345 assert export
.clients
[0].access_type
is None
346 assert export
.clients
[1].addresses
== ["192.168.0.0/16"]
347 # assert export.clients[1].squash == "all_squash" # probably correct value
348 assert export
.clients
[1].squash
== "All"
349 assert export
.clients
[1].access_type
== "RO"
350 assert export
.cluster_id
== 'foo'
351 assert export
.attr_expiration_time
== 0
352 # assert export.security_label == False # probably correct value
353 assert export
.security_label
== True
355 def test_export_parser_1(self
) -> None:
356 blocks
= GaneshaConfParser(self
.export_1
).parse()
357 assert isinstance(blocks
, list)
358 assert len(blocks
) == 1
359 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
360 self
._validate
_export
_1(export
)
362 def _validate_export_2(self
, export
: Export
):
363 assert export
.export_id
== 2
364 assert export
.path
== "/"
365 assert export
.pseudo
== "/rgw"
366 assert export
.access_type
== "RW"
367 # assert export.squash == "all_squash" # probably correct value
368 assert export
.squash
== "AllAnonymous"
369 assert export
.protocols
== [4, 3]
370 assert set(export
.transports
) == {"TCP", "UDP"}
371 assert export
.fsal
.name
== "RGW"
372 assert export
.fsal
.user_id
== "nfs.foo.bucket"
373 assert export
.fsal
.access_key_id
== "the_access_key"
374 assert export
.fsal
.secret_access_key
== "the_secret_key"
375 assert len(export
.clients
) == 0
376 assert export
.cluster_id
== 'foo'
378 def test_export_parser_2(self
) -> None:
379 blocks
= GaneshaConfParser(self
.export_2
).parse()
380 assert isinstance(blocks
, list)
381 assert len(blocks
) == 1
382 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
383 self
._validate
_export
_2(export
)
385 def test_daemon_conf_parser(self
) -> None:
386 blocks
= GaneshaConfParser(self
.conf_nfs_foo
).parse()
387 assert isinstance(blocks
, list)
388 assert len(blocks
) == 2
389 assert blocks
[0].block_name
== "%url"
390 assert blocks
[0].values
['value'] == f
"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-1"
391 assert blocks
[1].block_name
== "%url"
392 assert blocks
[1].values
['value'] == f
"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2"
394 def _do_mock_test(self
, func
, *args
) -> None:
395 with self
._mock
_orchestrator
(True):
397 self
._reset
_temp
_store
()
399 def test_ganesha_conf(self
) -> None:
400 self
._do
_mock
_test
(self
._do
_test
_ganesha
_conf
)
402 def _do_test_ganesha_conf(self
) -> None:
403 nfs_mod
= Module('nfs', '', '')
404 ganesha_conf
= ExportMgr(nfs_mod
)
405 exports
= ganesha_conf
.exports
[self
.cluster_id
]
407 assert len(exports
) == 2
409 self
._validate
_export
_1([e
for e
in exports
if e
.export_id
== 1][0])
410 self
._validate
_export
_2([e
for e
in exports
if e
.export_id
== 2][0])
412 def test_config_dict(self
) -> None:
413 self
._do
_mock
_test
(self
._do
_test
_config
_dict
)
415 def _do_test_config_dict(self
) -> None:
416 nfs_mod
= Module('nfs', '', '')
417 conf
= ExportMgr(nfs_mod
)
418 export
= [e
for e
in conf
.exports
['foo'] if e
.export_id
== 1][0]
419 ex_dict
= export
.to_dict()
421 assert ex_dict
== {'access_type': 'RW',
422 'clients': [{'access_type': None,
423 'addresses': ['192.168.0.10', '192.168.1.0/8'],
425 {'access_type': 'RO',
426 'addresses': ['192.168.0.0/16'],
428 'cluster_id': self
.cluster_id
,
430 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
433 'pseudo': '/cephfs_a/',
434 'security_label': True,
435 'squash': 'no_root_squash',
438 export
= [e
for e
in conf
.exports
['foo'] if e
.export_id
== 2][0]
439 ex_dict
= export
.to_dict()
440 assert ex_dict
== {'access_type': 'RW',
442 'cluster_id': self
.cluster_id
,
444 'fsal': {'name': 'RGW',
445 'access_key_id': 'the_access_key',
446 'secret_access_key': 'the_secret_key',
447 'user_id': 'nfs.foo.bucket'},
451 'security_label': True,
452 'squash': 'AllAnonymous',
453 'transports': ['TCP', 'UDP']}
455 def test_config_from_dict(self
) -> None:
456 self
._do
_mock
_test
(self
._do
_test
_config
_from
_dict
)
458 def _do_test_config_from_dict(self
) -> None:
459 export
= Export
.from_dict(1, {
462 'cluster_id': self
.cluster_id
,
463 'pseudo': '/cephfs_a',
465 'squash': 'root_squash',
466 'security_label': True,
468 'transports': ['TCP', 'UDP'],
470 'addresses': ["192.168.0.10", "192.168.1.0/8"],
472 'squash': 'no_root_squash'
474 'addresses': ["192.168.0.0/16"],
476 'squash': 'all_squash'
480 'user_id': 'ganesha',
482 'sec_label_xattr': 'security.selinux'
486 assert export
.export_id
== 1
487 assert export
.path
== "/"
488 assert export
.pseudo
== "/cephfs_a"
489 assert export
.access_type
== "RW"
490 assert export
.squash
== "root_squash"
491 assert set(export
.protocols
) == {4}
492 assert set(export
.transports
) == {"TCP", "UDP"}
493 assert export
.fsal
.name
== "CEPH"
494 assert export
.fsal
.user_id
== "ganesha"
495 assert export
.fsal
.fs_name
== "a"
496 assert export
.fsal
.sec_label_xattr
== 'security.selinux'
497 assert len(export
.clients
) == 2
498 assert export
.clients
[0].addresses
== \
499 ["192.168.0.10", "192.168.1.0/8"]
500 assert export
.clients
[0].squash
== "no_root_squash"
501 assert export
.clients
[0].access_type
is None
502 assert export
.clients
[1].addresses
== ["192.168.0.0/16"]
503 assert export
.clients
[1].squash
== "all_squash"
504 assert export
.clients
[1].access_type
== "RO"
505 assert export
.cluster_id
== self
.cluster_id
506 assert export
.attr_expiration_time
== 0
507 assert export
.security_label
509 export
= Export
.from_dict(2, {
513 'cluster_id': self
.cluster_id
,
515 'squash': 'all_squash',
516 'security_label': False,
518 'transports': ['TCP', 'UDP'],
522 'user_id': 'rgw.foo.bucket',
523 'access_key_id': 'the_access_key',
524 'secret_access_key': 'the_secret_key'
528 assert export
.export_id
== 2
529 assert export
.path
== "bucket"
530 assert export
.pseudo
== "/rgw"
531 assert export
.access_type
== "RW"
532 assert export
.squash
== "all_squash"
533 assert set(export
.protocols
) == {4, 3}
534 assert set(export
.transports
) == {"TCP", "UDP"}
535 assert export
.fsal
.name
== "RGW"
536 assert export
.fsal
.user_id
== "rgw.foo.bucket"
537 assert export
.fsal
.access_key_id
== "the_access_key"
538 assert export
.fsal
.secret_access_key
== "the_secret_key"
539 assert len(export
.clients
) == 0
540 assert export
.cluster_id
== self
.cluster_id
542 @pytest.mark
.parametrize(
549 def test_export_from_to_export_block(self
, block
):
550 blocks
= GaneshaConfParser(block
).parse()
551 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
552 newblock
= export
.to_export_block()
553 export2
= Export
.from_export_block(newblock
, self
.cluster_id
)
554 newblock2
= export2
.to_export_block()
555 assert newblock
== newblock2
557 @pytest.mark
.parametrize(
564 def test_export_from_to_dict(self
, block
):
565 blocks
= GaneshaConfParser(block
).parse()
566 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
568 export2
= Export
.from_dict(j
['export_id'], j
)
569 j2
= export2
.to_dict()
572 @pytest.mark
.parametrize(
579 def test_export_validate(self
, block
):
580 blocks
= GaneshaConfParser(block
).parse()
581 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
582 nfs_mod
= Module('nfs', '', '')
583 with mock
.patch('nfs.ganesha_conf.check_fs', return_value
=True):
584 export
.validate(nfs_mod
)
586 def test_update_export(self
):
587 self
._do
_mock
_test
(self
._do
_test
_update
_export
)
589 def _do_test_update_export(self
):
590 nfs_mod
= Module('nfs', '', '')
591 conf
= ExportMgr(nfs_mod
)
592 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
595 'pseudo': '/rgw/bucket',
596 'cluster_id': self
.cluster_id
,
598 'squash': 'all_squash',
599 'security_label': False,
601 'transports': ['TCP', 'UDP'],
603 'addresses': ["192.168.0.0/16"],
609 'user_id': 'nfs.foo.bucket',
610 'access_key_id': 'the_access_key',
611 'secret_access_key': 'the_secret_key',
614 assert len(r
.changes
) == 1
616 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
617 assert export
.export_id
== 2
618 assert export
.path
== "bucket"
619 assert export
.pseudo
== "/rgw/bucket"
620 assert export
.access_type
== "RW"
621 assert export
.squash
== "all_squash"
622 assert export
.protocols
== [4, 3]
623 assert export
.transports
== ["TCP", "UDP"]
624 assert export
.fsal
.name
== "RGW"
625 assert export
.fsal
.access_key_id
== "the_access_key"
626 assert export
.fsal
.secret_access_key
== "the_secret_key"
627 assert len(export
.clients
) == 1
628 assert export
.clients
[0].squash
is None
629 assert export
.clients
[0].access_type
is None
630 assert export
.cluster_id
== self
.cluster_id
632 # do it again, with changes
633 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
636 'pseudo': '/rgw/bucket',
637 'cluster_id': self
.cluster_id
,
640 'security_label': False,
642 'transports': ['TCP'],
644 'addresses': ["192.168.10.0/16"],
650 'user_id': 'nfs.foo.newbucket',
651 'access_key_id': 'the_access_key',
652 'secret_access_key': 'the_secret_key',
655 assert len(r
.changes
) == 1
657 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
658 assert export
.export_id
== 2
659 assert export
.path
== "newbucket"
660 assert export
.pseudo
== "/rgw/bucket"
661 assert export
.access_type
== "RO"
662 assert export
.squash
== "root"
663 assert export
.protocols
== [4]
664 assert export
.transports
== ["TCP"]
665 assert export
.fsal
.name
== "RGW"
666 assert export
.fsal
.access_key_id
== "the_access_key"
667 assert export
.fsal
.secret_access_key
== "the_secret_key"
668 assert len(export
.clients
) == 1
669 assert export
.clients
[0].squash
is None
670 assert export
.clients
[0].access_type
is None
671 assert export
.cluster_id
== self
.cluster_id
673 # again, but without export_id
674 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
675 'path': 'newestbucket',
676 'pseudo': '/rgw/bucket',
677 'cluster_id': self
.cluster_id
,
680 'security_label': False,
682 'transports': ['TCP'],
684 'addresses': ["192.168.10.0/16"],
690 'user_id': 'nfs.foo.newestbucket',
691 'access_key_id': 'the_access_key',
692 'secret_access_key': 'the_secret_key',
695 assert len(r
.changes
) == 1
697 export
= conf
._fetch
_export
(self
.cluster_id
, '/rgw/bucket')
698 assert export
.export_id
== 2
699 assert export
.path
== "newestbucket"
700 assert export
.pseudo
== "/rgw/bucket"
701 assert export
.access_type
== "RW"
702 assert export
.squash
== "root"
703 assert export
.protocols
== [4]
704 assert export
.transports
== ["TCP"]
705 assert export
.fsal
.name
== "RGW"
706 assert export
.fsal
.access_key_id
== "the_access_key"
707 assert export
.fsal
.secret_access_key
== "the_secret_key"
708 assert len(export
.clients
) == 1
709 assert export
.clients
[0].squash
is None
710 assert export
.clients
[0].access_type
is None
711 assert export
.cluster_id
== self
.cluster_id
713 def test_update_export_sectype(self
):
714 self
._do
_mock
_test
(self
._test
_update
_export
_sectype
)
716 def _test_update_export_sectype(self
):
717 nfs_mod
= Module('nfs', '', '')
718 conf
= ExportMgr(nfs_mod
)
719 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
722 'pseudo': '/rgw/bucket',
723 'cluster_id': self
.cluster_id
,
725 'squash': 'all_squash',
726 'security_label': False,
728 'transports': ['TCP', 'UDP'],
730 'addresses': ["192.168.0.0/16"],
736 'user_id': 'nfs.foo.bucket',
737 'access_key_id': 'the_access_key',
738 'secret_access_key': 'the_secret_key',
741 assert len(r
.changes
) == 1
743 # no sectype was given, key not present
744 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/rgw/bucket")
745 assert info
["export_id"] == 2
746 assert info
["path"] == "bucket"
747 assert "sectype" not in info
749 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
752 'pseudo': '/rgw/bucket',
753 'cluster_id': self
.cluster_id
,
755 'squash': 'all_squash',
756 'security_label': False,
758 'transports': ['TCP', 'UDP'],
760 'addresses': ["192.168.0.0/16"],
764 'sectype': ["krb5p", "krb5i", "sys"],
767 'user_id': 'nfs.foo.bucket',
768 'access_key_id': 'the_access_key',
769 'secret_access_key': 'the_secret_key',
772 assert len(r
.changes
) == 1
774 # assert sectype matches new value(s)
775 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/rgw/bucket")
776 assert info
["export_id"] == 2
777 assert info
["path"] == "bucket"
778 assert info
["sectype"] == ["krb5p", "krb5i", "sys"]
780 def test_update_export_with_ganesha_conf(self
):
781 self
._do
_mock
_test
(self
._do
_test
_update
_export
_with
_ganesha
_conf
)
783 def _do_test_update_export_with_ganesha_conf(self
):
784 nfs_mod
= Module('nfs', '', '')
785 conf
= ExportMgr(nfs_mod
)
786 r
= conf
.apply_export(self
.cluster_id
, self
.export_3
)
787 assert len(r
.changes
) == 1
789 def test_update_export_with_ganesha_conf_sectype(self
):
791 self
._do
_test
_update
_export
_with
_ganesha
_conf
_sectype
,
792 self
.export_4
, ["krb5p", "krb5i"])
794 def test_update_export_with_ganesha_conf_sectype_lcase(self
):
795 export_conf
= self
.export_4
.replace("SecType", "sectype").replace("krb5i", "sys")
797 self
._do
_test
_update
_export
_with
_ganesha
_conf
_sectype
,
798 export_conf
, ["krb5p", "sys"])
800 def _do_test_update_export_with_ganesha_conf_sectype(self
, export_conf
, expect_sectype
):
801 nfs_mod
= Module('nfs', '', '')
802 conf
= ExportMgr(nfs_mod
)
803 r
= conf
.apply_export(self
.cluster_id
, export_conf
)
804 assert len(r
.changes
) == 1
806 # assert sectype matches new value(s)
807 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/secure1")
808 assert info
["export_id"] == 1
809 assert info
["path"] == "/secure/me"
810 assert info
["sectype"] == expect_sectype
812 def test_update_export_with_list(self
):
813 self
._do
_mock
_test
(self
._do
_test
_update
_export
_with
_list
)
815 def _do_test_update_export_with_list(self
):
816 nfs_mod
= Module('nfs', '', '')
817 conf
= ExportMgr(nfs_mod
)
818 r
= conf
.apply_export(self
.cluster_id
, json
.dumps([
821 'pseudo': '/rgw/bucket',
822 'cluster_id': self
.cluster_id
,
825 'security_label': False,
827 'transports': ['TCP'],
829 'addresses': ["192.168.0.0/16"],
835 'user_id': 'nfs.foo.bucket',
836 'access_key_id': 'the_access_key',
837 'secret_access_key': 'the_secret_key',
842 'pseudo': '/rgw/bucket2',
843 'cluster_id': self
.cluster_id
,
846 'security_label': False,
848 'transports': ['TCP'],
850 'addresses': ["192.168.0.0/16"],
856 'user_id': 'nfs.foo.bucket2',
857 'access_key_id': 'the_access_key',
858 'secret_access_key': 'the_secret_key',
862 # The input object above contains TWO items (two different pseudo paths)
863 # therefore we expect the result to report that two changes have been
864 # applied, rather than the typical 1 change.
865 assert len(r
.changes
) == 2
867 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
868 assert export
.export_id
== 3
869 assert export
.path
== "bucket"
870 assert export
.pseudo
== "/rgw/bucket"
871 assert export
.access_type
== "RW"
872 assert export
.squash
== "root"
873 assert export
.protocols
== [4]
874 assert export
.transports
== ["TCP"]
875 assert export
.fsal
.name
== "RGW"
876 assert export
.fsal
.access_key_id
== "the_access_key"
877 assert export
.fsal
.secret_access_key
== "the_secret_key"
878 assert len(export
.clients
) == 1
879 assert export
.clients
[0].squash
is None
880 assert export
.clients
[0].access_type
is None
881 assert export
.cluster_id
== self
.cluster_id
883 export
= conf
._fetch
_export
('foo', '/rgw/bucket2')
884 assert export
.export_id
== 4
885 assert export
.path
== "bucket2"
886 assert export
.pseudo
== "/rgw/bucket2"
887 assert export
.access_type
== "RO"
888 assert export
.squash
== "root"
889 assert export
.protocols
== [4]
890 assert export
.transports
== ["TCP"]
891 assert export
.fsal
.name
== "RGW"
892 assert export
.fsal
.access_key_id
== "the_access_key"
893 assert export
.fsal
.secret_access_key
== "the_secret_key"
894 assert len(export
.clients
) == 1
895 assert export
.clients
[0].squash
is None
896 assert export
.clients
[0].access_type
is None
897 assert export
.cluster_id
== self
.cluster_id
899 def test_remove_export(self
) -> None:
900 self
._do
_mock
_test
(self
._do
_test
_remove
_export
)
902 def _do_test_remove_export(self
) -> None:
903 nfs_mod
= Module('nfs', '', '')
904 conf
= ExportMgr(nfs_mod
)
905 assert len(conf
.exports
[self
.cluster_id
]) == 2
906 conf
.delete_export(cluster_id
=self
.cluster_id
,
908 exports
= conf
.exports
[self
.cluster_id
]
909 assert len(exports
) == 1
910 assert exports
[0].export_id
== 1
912 def test_create_export_rgw_bucket(self
):
913 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_bucket
)
915 def _do_test_create_export_rgw_bucket(self
):
916 nfs_mod
= Module('nfs', '', '')
917 conf
= ExportMgr(nfs_mod
)
919 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
922 r
= conf
.create_export(
924 cluster_id
=self
.cluster_id
,
926 pseudo_path
='/mybucket',
929 addr
=["192.168.0.0/16"]
931 assert r
["bind"] == "/mybucket"
933 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
936 export
= conf
._fetch
_export
('foo', '/mybucket')
937 assert export
.export_id
938 assert export
.path
== "bucket"
939 assert export
.pseudo
== "/mybucket"
940 assert export
.access_type
== "none"
941 assert export
.squash
== "none"
942 assert export
.protocols
== [4]
943 assert export
.transports
== ["TCP"]
944 assert export
.fsal
.name
== "RGW"
945 assert export
.fsal
.user_id
== "bucket_owner_user"
946 assert export
.fsal
.access_key_id
== "the_access_key"
947 assert export
.fsal
.secret_access_key
== "the_secret_key"
948 assert len(export
.clients
) == 1
949 assert export
.clients
[0].squash
== 'root'
950 assert export
.clients
[0].access_type
== 'rw'
951 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
952 assert export
.cluster_id
== self
.cluster_id
954 def test_create_export_rgw_bucket_user(self
):
955 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_bucket
_user
)
957 def _do_test_create_export_rgw_bucket_user(self
):
958 nfs_mod
= Module('nfs', '', '')
959 conf
= ExportMgr(nfs_mod
)
961 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
964 r
= conf
.create_export(
966 cluster_id
=self
.cluster_id
,
968 user_id
='other_user',
969 pseudo_path
='/mybucket',
972 addr
=["192.168.0.0/16"]
974 assert r
["bind"] == "/mybucket"
976 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
979 export
= conf
._fetch
_export
('foo', '/mybucket')
980 assert export
.export_id
981 assert export
.path
== "bucket"
982 assert export
.pseudo
== "/mybucket"
983 assert export
.access_type
== "none"
984 assert export
.squash
== "none"
985 assert export
.protocols
== [4]
986 assert export
.transports
== ["TCP"]
987 assert export
.fsal
.name
== "RGW"
988 assert export
.fsal
.access_key_id
== "the_access_key"
989 assert export
.fsal
.secret_access_key
== "the_secret_key"
990 assert len(export
.clients
) == 1
991 assert export
.clients
[0].squash
== 'root'
992 assert export
.fsal
.user_id
== "other_user"
993 assert export
.clients
[0].access_type
== 'rw'
994 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
995 assert export
.cluster_id
== self
.cluster_id
997 def test_create_export_rgw_user(self
):
998 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_user
)
1000 def _do_test_create_export_rgw_user(self
):
1001 nfs_mod
= Module('nfs', '', '')
1002 conf
= ExportMgr(nfs_mod
)
1004 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1007 r
= conf
.create_export(
1009 cluster_id
=self
.cluster_id
,
1010 user_id
='some_user',
1011 pseudo_path
='/mybucket',
1014 addr
=["192.168.0.0/16"]
1016 assert r
["bind"] == "/mybucket"
1018 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1021 export
= conf
._fetch
_export
('foo', '/mybucket')
1022 assert export
.export_id
1023 assert export
.path
== "/"
1024 assert export
.pseudo
== "/mybucket"
1025 assert export
.access_type
== "none"
1026 assert export
.squash
== "none"
1027 assert export
.protocols
== [4]
1028 assert export
.transports
== ["TCP"]
1029 assert export
.fsal
.name
== "RGW"
1030 assert export
.fsal
.access_key_id
== "the_access_key"
1031 assert export
.fsal
.secret_access_key
== "the_secret_key"
1032 assert len(export
.clients
) == 1
1033 assert export
.clients
[0].squash
== 'root'
1034 assert export
.fsal
.user_id
== "some_user"
1035 assert export
.clients
[0].access_type
== 'rw'
1036 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
1037 assert export
.cluster_id
== self
.cluster_id
1039 def test_create_export_cephfs(self
):
1040 self
._do
_mock
_test
(self
._do
_test
_create
_export
_cephfs
)
1042 def _do_test_create_export_cephfs(self
):
1043 nfs_mod
= Module('nfs', '', '')
1044 conf
= ExportMgr(nfs_mod
)
1046 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1049 r
= conf
.create_export(
1051 cluster_id
=self
.cluster_id
,
1054 pseudo_path
='/cephfs2',
1057 addr
=["192.168.1.0/8"],
1059 assert r
["bind"] == "/cephfs2"
1061 ls
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1064 export
= conf
._fetch
_export
('foo', '/cephfs2')
1065 assert export
.export_id
1066 assert export
.path
== "/"
1067 assert export
.pseudo
== "/cephfs2"
1068 assert export
.access_type
== "none"
1069 assert export
.squash
== "none"
1070 assert export
.protocols
== [4]
1071 assert export
.transports
== ["TCP"]
1072 assert export
.fsal
.name
== "CEPH"
1073 assert export
.fsal
.user_id
== "nfs.foo.3"
1074 assert export
.fsal
.cephx_key
== "thekeyforclientabc"
1075 assert len(export
.clients
) == 1
1076 assert export
.clients
[0].squash
== 'root'
1077 assert export
.clients
[0].access_type
== 'rw'
1078 assert export
.clients
[0].addresses
== ["192.168.1.0/8"]
1079 assert export
.cluster_id
== self
.cluster_id
1081 def _do_test_cluster_ls(self
):
1082 nfs_mod
= Module('nfs', '', '')
1083 cluster
= NFSCluster(nfs_mod
)
1085 out
= cluster
.list_nfs_cluster()
1086 assert out
[0] == self
.cluster_id
1088 def test_cluster_ls(self
):
1089 self
._do
_mock
_test
(self
._do
_test
_cluster
_ls
)
1091 def _do_test_cluster_info(self
):
1092 nfs_mod
= Module('nfs', '', '')
1093 cluster
= NFSCluster(nfs_mod
)
1095 out
= cluster
.show_nfs_cluster_info(self
.cluster_id
)
1096 assert out
== {"foo": {"virtual_ip": None, "backend": []}}
1098 def test_cluster_info(self
):
1099 self
._do
_mock
_test
(self
._do
_test
_cluster
_info
)
1101 def _do_test_cluster_config(self
):
1102 nfs_mod
= Module('nfs', '', '')
1103 cluster
= NFSCluster(nfs_mod
)
1105 out
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1108 cluster
.set_nfs_cluster_config(self
.cluster_id
, '# foo\n')
1110 out
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1111 assert out
== "# foo\n"
1113 cluster
.reset_nfs_cluster_config(self
.cluster_id
)
1115 out
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1118 def test_cluster_config(self
):
1119 self
._do
_mock
_test
(self
._do
_test
_cluster
_config
)
1122 @pytest.mark
.parametrize(
1125 ("/foo/bar/baz", "/foo/bar/baz"),
1126 ("/foo/bar/baz/", "/foo/bar/baz"),
1127 ("/foo/bar/baz ", "/foo/bar/baz"),
1128 ("/foo/./bar/baz", "/foo/bar/baz"),
1129 ("/foo/bar/baz/..", "/foo/bar"),
1130 ("//foo/bar/baz", "/foo/bar/baz"),
1134 def test_normalize_path(path
, expected
):
1135 assert normalize_path(path
) == expected
1138 def test_ganesha_validate_squash():
1139 """Check error handling of internal validation function for squash value."""
1140 from nfs
.ganesha_conf
import _validate_squash
1141 from nfs
.exception
import NFSInvalidOperation
1143 _validate_squash("root")
1144 with pytest
.raises(NFSInvalidOperation
):
1145 _validate_squash("toot")
1148 def test_ganesha_validate_access_type():
1149 """Check error handling of internal validation function for access type value."""
1150 from nfs
.ganesha_conf
import _validate_access_type
1151 from nfs
.exception
import NFSInvalidOperation
1153 for ok
in ("rw", "ro", "none"):
1154 _validate_access_type(ok
)
1155 with pytest
.raises(NFSInvalidOperation
):
1156 _validate_access_type("any")