]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/tests/test_nfs.py
4 from typing
import Optional
, Tuple
, Iterator
, List
, Any
6 from contextlib
import contextmanager
7 from unittest
import mock
8 from unittest
.mock
import MagicMock
9 from mgr_module
import MgrModule
, NFS_POOL_NAME
11 from rados
import ObjectNotFound
13 from ceph
.deployment
.service_spec
import NFSServiceSpec
14 from nfs
import Module
15 from nfs
.export
import ExportMgr
, normalize_path
16 from nfs
.ganesha_conf
import GaneshaConfParser
, Export
, RawBlock
17 from nfs
.cluster
import NFSCluster
18 from orchestrator
import ServiceDescription
, DaemonDescription
, OrchResult
31 Attr_Expiration_Time = 0;
38 # Secret_Access_Key = "YOUR SECRET KEY HERE";
43 Clients = 192.168.0.10, 192.168.1.0/8;
49 Clients = 192.168.0.0/16;
63 squash = AllAnonymous;
65 Transports = TCP, UDP;
69 User_Id = "nfs.foo.bucket";
70 Access_Key_Id ="the_access_key";
71 Secret_Access_Key = "the_secret_key";
79 user_id = "nfs.foo.1";
81 secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
88 attr_expiration_time = 0;
89 security_label = true;
98 user_id = "nfs.foo.1";
100 secret_access_key = "AQCjU+hgjyReLBAAddJa0Dza/ZHqjX5+JiePMA==";
106 squash = "no_root_squash";
107 SecType = "krb5p", "krb5i";
108 attr_expiration_time = 0;
109 security_label = true;
116 %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-1"
118 %url "rados://{NFS_POOL_NAME}/{cluster_id}/export-2"'''
120 class RObject(object):
121 def __init__(self
, key
: str, raw
: str) -> None:
125 def read(self
, _
: Optional
[int]) -> bytes
:
126 return self
.raw
.encode('utf-8')
128 def stat(self
) -> Tuple
[int, None]:
129 return len(self
.raw
), None
131 def _ioctx_write_full_mock(self
, key
: str, content
: bytes
) -> None:
132 if key
not in self
.temp_store
[self
.temp_store_namespace
]:
133 self
.temp_store
[self
.temp_store_namespace
][key
] = \
134 TestNFS
.RObject(key
, content
.decode('utf-8'))
136 self
.temp_store
[self
.temp_store_namespace
][key
].raw
= content
.decode('utf-8')
138 def _ioctx_remove_mock(self
, key
: str) -> None:
139 del self
.temp_store
[self
.temp_store_namespace
][key
]
141 def _ioctx_list_objects_mock(self
) -> List
['TestNFS.RObject']:
142 r
= [obj
for _
, obj
in self
.temp_store
[self
.temp_store_namespace
].items()]
145 def _ioctl_stat_mock(self
, key
):
146 return self
.temp_store
[self
.temp_store_namespace
][key
].stat()
148 def _ioctl_read_mock(self
, key
: str, size
: Optional
[Any
] = None) -> bytes
:
149 if key
not in self
.temp_store
[self
.temp_store_namespace
]:
151 return self
.temp_store
[self
.temp_store_namespace
][key
].read(size
)
153 def _ioctx_set_namespace_mock(self
, namespace
: str) -> None:
154 self
.temp_store_namespace
= namespace
156 def _reset_temp_store(self
) -> None:
157 self
.temp_store_namespace
= None
160 'export-1': TestNFS
.RObject("export-1", self
.export_1
),
161 'export-2': TestNFS
.RObject("export-2", self
.export_2
),
162 'conf-nfs.foo': TestNFS
.RObject("conf-nfs.foo", self
.conf_nfs_foo
)
167 def _mock_orchestrator(self
, enable
: bool) -> Iterator
:
168 self
.io_mock
= MagicMock()
169 self
.io_mock
.set_namespace
.side_effect
= self
._ioctx
_set
_namespace
_mock
170 self
.io_mock
.read
= self
._ioctl
_read
_mock
171 self
.io_mock
.stat
= self
._ioctl
_stat
_mock
172 self
.io_mock
.list_objects
.side_effect
= self
._ioctx
_list
_objects
_mock
173 self
.io_mock
.write_full
.side_effect
= self
._ioctx
_write
_full
_mock
174 self
.io_mock
.remove_object
.side_effect
= self
._ioctx
_remove
_mock
177 orch_nfs_services
= [
178 ServiceDescription(spec
=NFSServiceSpec(service_id
=self
.cluster_id
))
182 DaemonDescription('nfs', 'foo.mydaemon', 'myhostname')
185 def mock_exec(cls
, args
):
186 if args
[1:3] == ['bucket', 'stats']:
188 "owner": "bucket_owner_user",
190 return 0, json
.dumps(bucket_info
), ''
193 "display_name": "foo",
201 "access_key": "the_access_key",
202 "secret_key": "the_secret_key"
207 "op_mask": "read, write, delete",
208 "default_placement": "",
209 "default_storage_class": "",
210 "placement_tags": [],
213 "check_on_raw": False,
220 "check_on_raw": False,
229 if args
[2] == 'list':
230 return 0, json
.dumps([u
]), ''
231 return 0, json
.dumps(u
), ''
233 def mock_describe_service(cls
, *args
, **kwargs
):
234 if kwargs
['service_type'] == 'nfs':
235 return OrchResult(orch_nfs_services
)
236 return OrchResult([])
238 def mock_list_daemons(cls
, *args
, **kwargs
):
239 if kwargs
['daemon_type'] == 'nfs':
240 return OrchResult(orch_nfs_daemons
)
241 return OrchResult([])
243 with mock
.patch('nfs.module.Module.describe_service', mock_describe_service
) as describe_service
, \
244 mock
.patch('nfs.module.Module.list_daemons', mock_list_daemons
) as list_daemons
, \
245 mock
.patch('nfs.module.Module.rados') as rados
, \
246 mock
.patch('nfs.export.available_clusters',
247 return_value
=[self
.cluster_id
]), \
248 mock
.patch('nfs.export.restart_nfs_service'), \
249 mock
.patch('nfs.cluster.restart_nfs_service'), \
250 mock
.patch
.object(MgrModule
, 'tool_exec', mock_exec
), \
251 mock
.patch('nfs.export.check_fs', return_value
=True), \
252 mock
.patch('nfs.ganesha_conf.check_fs', return_value
=True), \
253 mock
.patch('nfs.export.ExportMgr._create_user_key',
254 return_value
='thekeyforclientabc'):
256 rados
.open_ioctx
.return_value
.__enter
__.return_value
= self
.io_mock
257 rados
.open_ioctx
.return_value
.__exit
__ = mock
.Mock(return_value
=None)
259 self
._reset
_temp
_store
()
263 def test_parse_daemon_raw_config(self
) -> None:
264 expected_daemon_config
= [
265 RawBlock('NFS_CORE_PARAM', values
={
267 "enable_rquota": False,
271 RawBlock('MDCACHE', values
={
274 RawBlock('NFSV4', values
={
275 "recoverybackend": "rados_cluster",
276 "minor_versions": [1, 2]
278 RawBlock('RADOS_KV', values
={
279 "pool": NFS_POOL_NAME
,
280 "namespace": "vstart",
284 RawBlock('RADOS_URLS', values
={
286 "watch_url": f
"'rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart'"
288 RawBlock('%url', values
={
289 "value": f
"rados://{NFS_POOL_NAME}/vstart/conf-nfs.vstart"
292 daemon_raw_config
= """
295 Enable_RQUOTA = false;
305 RecoveryBackend = rados_cluster;
306 Minor_Versions = 1, 2;
318 watch_url = 'rados://{}/vstart/conf-nfs.vstart';
321 %url rados://{}/vstart/conf-nfs.vstart
322 """.replace('{}', NFS_POOL_NAME
)
323 daemon_config
= GaneshaConfParser(daemon_raw_config
).parse()
324 assert daemon_config
== expected_daemon_config
326 def _validate_export_1(self
, export
: Export
):
327 assert export
.export_id
== 1
328 assert export
.path
== "/"
329 assert export
.pseudo
== "/cephfs_a/"
330 assert export
.access_type
== "RW"
331 # assert export.squash == "root_squash" # probably correct value
332 assert export
.squash
== "no_root_squash"
333 assert export
.protocols
== [4]
334 # assert export.transports == {"TCP", "UDP"}
335 assert export
.fsal
.name
== "CEPH"
336 assert export
.fsal
.user_id
== "ganesha"
337 assert export
.fsal
.fs_name
== "a"
338 assert export
.fsal
.sec_label_xattr
== None
339 assert len(export
.clients
) == 2
340 assert export
.clients
[0].addresses
== \
341 ["192.168.0.10", "192.168.1.0/8"]
342 # assert export.clients[0].squash == "no_root_squash" # probably correct value
343 assert export
.clients
[0].squash
== "None"
344 assert export
.clients
[0].access_type
is None
345 assert export
.clients
[1].addresses
== ["192.168.0.0/16"]
346 # assert export.clients[1].squash == "all_squash" # probably correct value
347 assert export
.clients
[1].squash
== "All"
348 assert export
.clients
[1].access_type
== "RO"
349 assert export
.cluster_id
== 'foo'
350 assert export
.attr_expiration_time
== 0
351 # assert export.security_label == False # probably correct value
352 assert export
.security_label
== True
354 def test_export_parser_1(self
) -> None:
355 blocks
= GaneshaConfParser(self
.export_1
).parse()
356 assert isinstance(blocks
, list)
357 assert len(blocks
) == 1
358 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
359 self
._validate
_export
_1(export
)
361 def _validate_export_2(self
, export
: Export
):
362 assert export
.export_id
== 2
363 assert export
.path
== "/"
364 assert export
.pseudo
== "/rgw"
365 assert export
.access_type
== "RW"
366 # assert export.squash == "all_squash" # probably correct value
367 assert export
.squash
== "AllAnonymous"
368 assert export
.protocols
== [4, 3]
369 assert set(export
.transports
) == {"TCP", "UDP"}
370 assert export
.fsal
.name
== "RGW"
371 assert export
.fsal
.user_id
== "nfs.foo.bucket"
372 assert export
.fsal
.access_key_id
== "the_access_key"
373 assert export
.fsal
.secret_access_key
== "the_secret_key"
374 assert len(export
.clients
) == 0
375 assert export
.cluster_id
== 'foo'
377 def test_export_parser_2(self
) -> None:
378 blocks
= GaneshaConfParser(self
.export_2
).parse()
379 assert isinstance(blocks
, list)
380 assert len(blocks
) == 1
381 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
382 self
._validate
_export
_2(export
)
384 def test_daemon_conf_parser(self
) -> None:
385 blocks
= GaneshaConfParser(self
.conf_nfs_foo
).parse()
386 assert isinstance(blocks
, list)
387 assert len(blocks
) == 2
388 assert blocks
[0].block_name
== "%url"
389 assert blocks
[0].values
['value'] == f
"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-1"
390 assert blocks
[1].block_name
== "%url"
391 assert blocks
[1].values
['value'] == f
"rados://{NFS_POOL_NAME}/{self.cluster_id}/export-2"
393 def _do_mock_test(self
, func
, *args
) -> None:
394 with self
._mock
_orchestrator
(True):
396 self
._reset
_temp
_store
()
398 def test_ganesha_conf(self
) -> None:
399 self
._do
_mock
_test
(self
._do
_test
_ganesha
_conf
)
401 def _do_test_ganesha_conf(self
) -> None:
402 nfs_mod
= Module('nfs', '', '')
403 ganesha_conf
= ExportMgr(nfs_mod
)
404 exports
= ganesha_conf
.exports
[self
.cluster_id
]
406 assert len(exports
) == 2
408 self
._validate
_export
_1([e
for e
in exports
if e
.export_id
== 1][0])
409 self
._validate
_export
_2([e
for e
in exports
if e
.export_id
== 2][0])
411 def test_config_dict(self
) -> None:
412 self
._do
_mock
_test
(self
._do
_test
_config
_dict
)
414 def _do_test_config_dict(self
) -> None:
415 nfs_mod
= Module('nfs', '', '')
416 conf
= ExportMgr(nfs_mod
)
417 export
= [e
for e
in conf
.exports
['foo'] if e
.export_id
== 1][0]
418 ex_dict
= export
.to_dict()
420 assert ex_dict
== {'access_type': 'RW',
421 'clients': [{'access_type': None,
422 'addresses': ['192.168.0.10', '192.168.1.0/8'],
424 {'access_type': 'RO',
425 'addresses': ['192.168.0.0/16'],
427 'cluster_id': self
.cluster_id
,
429 'fsal': {'fs_name': 'a', 'name': 'CEPH', 'user_id': 'ganesha'},
432 'pseudo': '/cephfs_a/',
433 'security_label': True,
434 'squash': 'no_root_squash',
437 export
= [e
for e
in conf
.exports
['foo'] if e
.export_id
== 2][0]
438 ex_dict
= export
.to_dict()
439 assert ex_dict
== {'access_type': 'RW',
441 'cluster_id': self
.cluster_id
,
443 'fsal': {'name': 'RGW',
444 'access_key_id': 'the_access_key',
445 'secret_access_key': 'the_secret_key',
446 'user_id': 'nfs.foo.bucket'},
450 'security_label': True,
451 'squash': 'AllAnonymous',
452 'transports': ['TCP', 'UDP']}
454 def test_config_from_dict(self
) -> None:
455 self
._do
_mock
_test
(self
._do
_test
_config
_from
_dict
)
457 def _do_test_config_from_dict(self
) -> None:
458 export
= Export
.from_dict(1, {
461 'cluster_id': self
.cluster_id
,
462 'pseudo': '/cephfs_a',
464 'squash': 'root_squash',
465 'security_label': True,
467 'transports': ['TCP', 'UDP'],
469 'addresses': ["192.168.0.10", "192.168.1.0/8"],
471 'squash': 'no_root_squash'
473 'addresses': ["192.168.0.0/16"],
475 'squash': 'all_squash'
479 'user_id': 'ganesha',
481 'sec_label_xattr': 'security.selinux'
485 assert export
.export_id
== 1
486 assert export
.path
== "/"
487 assert export
.pseudo
== "/cephfs_a"
488 assert export
.access_type
== "RW"
489 assert export
.squash
== "root_squash"
490 assert set(export
.protocols
) == {4}
491 assert set(export
.transports
) == {"TCP", "UDP"}
492 assert export
.fsal
.name
== "CEPH"
493 assert export
.fsal
.user_id
== "ganesha"
494 assert export
.fsal
.fs_name
== "a"
495 assert export
.fsal
.sec_label_xattr
== 'security.selinux'
496 assert len(export
.clients
) == 2
497 assert export
.clients
[0].addresses
== \
498 ["192.168.0.10", "192.168.1.0/8"]
499 assert export
.clients
[0].squash
== "no_root_squash"
500 assert export
.clients
[0].access_type
is None
501 assert export
.clients
[1].addresses
== ["192.168.0.0/16"]
502 assert export
.clients
[1].squash
== "all_squash"
503 assert export
.clients
[1].access_type
== "RO"
504 assert export
.cluster_id
== self
.cluster_id
505 assert export
.attr_expiration_time
== 0
506 assert export
.security_label
508 export
= Export
.from_dict(2, {
512 'cluster_id': self
.cluster_id
,
514 'squash': 'all_squash',
515 'security_label': False,
517 'transports': ['TCP', 'UDP'],
521 'user_id': 'rgw.foo.bucket',
522 'access_key_id': 'the_access_key',
523 'secret_access_key': 'the_secret_key'
527 assert export
.export_id
== 2
528 assert export
.path
== "bucket"
529 assert export
.pseudo
== "/rgw"
530 assert export
.access_type
== "RW"
531 assert export
.squash
== "all_squash"
532 assert set(export
.protocols
) == {4, 3}
533 assert set(export
.transports
) == {"TCP", "UDP"}
534 assert export
.fsal
.name
== "RGW"
535 assert export
.fsal
.user_id
== "rgw.foo.bucket"
536 assert export
.fsal
.access_key_id
== "the_access_key"
537 assert export
.fsal
.secret_access_key
== "the_secret_key"
538 assert len(export
.clients
) == 0
539 assert export
.cluster_id
== self
.cluster_id
541 @pytest.mark
.parametrize(
548 def test_export_from_to_export_block(self
, block
):
549 blocks
= GaneshaConfParser(block
).parse()
550 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
551 newblock
= export
.to_export_block()
552 export2
= Export
.from_export_block(newblock
, self
.cluster_id
)
553 newblock2
= export2
.to_export_block()
554 assert newblock
== newblock2
556 @pytest.mark
.parametrize(
563 def test_export_from_to_dict(self
, block
):
564 blocks
= GaneshaConfParser(block
).parse()
565 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
567 export2
= Export
.from_dict(j
['export_id'], j
)
568 j2
= export2
.to_dict()
571 @pytest.mark
.parametrize(
578 def test_export_validate(self
, block
):
579 blocks
= GaneshaConfParser(block
).parse()
580 export
= Export
.from_export_block(blocks
[0], self
.cluster_id
)
581 nfs_mod
= Module('nfs', '', '')
582 with mock
.patch('nfs.ganesha_conf.check_fs', return_value
=True):
583 export
.validate(nfs_mod
)
585 def test_update_export(self
):
586 self
._do
_mock
_test
(self
._do
_test
_update
_export
)
588 def _do_test_update_export(self
):
589 nfs_mod
= Module('nfs', '', '')
590 conf
= ExportMgr(nfs_mod
)
591 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
594 'pseudo': '/rgw/bucket',
595 'cluster_id': self
.cluster_id
,
597 'squash': 'all_squash',
598 'security_label': False,
600 'transports': ['TCP', 'UDP'],
602 'addresses': ["192.168.0.0/16"],
608 'user_id': 'nfs.foo.bucket',
609 'access_key_id': 'the_access_key',
610 'secret_access_key': 'the_secret_key',
615 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
616 assert export
.export_id
== 2
617 assert export
.path
== "bucket"
618 assert export
.pseudo
== "/rgw/bucket"
619 assert export
.access_type
== "RW"
620 assert export
.squash
== "all_squash"
621 assert export
.protocols
== [4, 3]
622 assert export
.transports
== ["TCP", "UDP"]
623 assert export
.fsal
.name
== "RGW"
624 assert export
.fsal
.access_key_id
== "the_access_key"
625 assert export
.fsal
.secret_access_key
== "the_secret_key"
626 assert len(export
.clients
) == 1
627 assert export
.clients
[0].squash
is None
628 assert export
.clients
[0].access_type
is None
629 assert export
.cluster_id
== self
.cluster_id
631 # do it again, with changes
632 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
635 'pseudo': '/rgw/bucket',
636 'cluster_id': self
.cluster_id
,
639 'security_label': False,
641 'transports': ['TCP'],
643 'addresses': ["192.168.10.0/16"],
649 'user_id': 'nfs.foo.newbucket',
650 'access_key_id': 'the_access_key',
651 'secret_access_key': 'the_secret_key',
656 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
657 assert export
.export_id
== 2
658 assert export
.path
== "newbucket"
659 assert export
.pseudo
== "/rgw/bucket"
660 assert export
.access_type
== "RO"
661 assert export
.squash
== "root"
662 assert export
.protocols
== [4]
663 assert export
.transports
== ["TCP"]
664 assert export
.fsal
.name
== "RGW"
665 assert export
.fsal
.access_key_id
== "the_access_key"
666 assert export
.fsal
.secret_access_key
== "the_secret_key"
667 assert len(export
.clients
) == 1
668 assert export
.clients
[0].squash
is None
669 assert export
.clients
[0].access_type
is None
670 assert export
.cluster_id
== self
.cluster_id
672 # again, but without export_id
673 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
674 'path': 'newestbucket',
675 'pseudo': '/rgw/bucket',
676 'cluster_id': self
.cluster_id
,
679 'security_label': False,
681 'transports': ['TCP'],
683 'addresses': ["192.168.10.0/16"],
689 'user_id': 'nfs.foo.newestbucket',
690 'access_key_id': 'the_access_key',
691 'secret_access_key': 'the_secret_key',
696 export
= conf
._fetch
_export
(self
.cluster_id
, '/rgw/bucket')
697 assert export
.export_id
== 2
698 assert export
.path
== "newestbucket"
699 assert export
.pseudo
== "/rgw/bucket"
700 assert export
.access_type
== "RW"
701 assert export
.squash
== "root"
702 assert export
.protocols
== [4]
703 assert export
.transports
== ["TCP"]
704 assert export
.fsal
.name
== "RGW"
705 assert export
.fsal
.access_key_id
== "the_access_key"
706 assert export
.fsal
.secret_access_key
== "the_secret_key"
707 assert len(export
.clients
) == 1
708 assert export
.clients
[0].squash
is None
709 assert export
.clients
[0].access_type
is None
710 assert export
.cluster_id
== self
.cluster_id
712 def test_update_export_sectype(self
):
713 self
._do
_mock
_test
(self
._test
_update
_export
_sectype
)
715 def _test_update_export_sectype(self
):
716 nfs_mod
= Module('nfs', '', '')
717 conf
= ExportMgr(nfs_mod
)
718 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
721 'pseudo': '/rgw/bucket',
722 'cluster_id': self
.cluster_id
,
724 'squash': 'all_squash',
725 'security_label': False,
727 'transports': ['TCP', 'UDP'],
729 'addresses': ["192.168.0.0/16"],
735 'user_id': 'nfs.foo.bucket',
736 'access_key_id': 'the_access_key',
737 'secret_access_key': 'the_secret_key',
742 # no sectype was given, key not present
743 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/rgw/bucket")
744 assert info
["export_id"] == 2
745 assert info
["path"] == "bucket"
746 assert "sectype" not in info
748 r
= conf
.apply_export(self
.cluster_id
, json
.dumps({
751 'pseudo': '/rgw/bucket',
752 'cluster_id': self
.cluster_id
,
754 'squash': 'all_squash',
755 'security_label': False,
757 'transports': ['TCP', 'UDP'],
759 'addresses': ["192.168.0.0/16"],
763 'sectype': ["krb5p", "krb5i", "sys"],
766 'user_id': 'nfs.foo.bucket',
767 'access_key_id': 'the_access_key',
768 'secret_access_key': 'the_secret_key',
773 # assert sectype matches new value(s)
774 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/rgw/bucket")
775 assert info
["export_id"] == 2
776 assert info
["path"] == "bucket"
777 assert info
["sectype"] == ["krb5p", "krb5i", "sys"]
779 def test_update_export_with_ganesha_conf(self
):
780 self
._do
_mock
_test
(self
._do
_test
_update
_export
_with
_ganesha
_conf
)
782 def _do_test_update_export_with_ganesha_conf(self
):
783 nfs_mod
= Module('nfs', '', '')
784 conf
= ExportMgr(nfs_mod
)
785 r
= conf
.apply_export(self
.cluster_id
, self
.export_3
)
788 def test_update_export_with_ganesha_conf_sectype(self
):
790 self
._do
_test
_update
_export
_with
_ganesha
_conf
_sectype
,
791 self
.export_4
, ["krb5p", "krb5i"])
793 def test_update_export_with_ganesha_conf_sectype_lcase(self
):
794 export_conf
= self
.export_4
.replace("SecType", "sectype").replace("krb5i", "sys")
796 self
._do
_test
_update
_export
_with
_ganesha
_conf
_sectype
,
797 export_conf
, ["krb5p", "sys"])
799 def _do_test_update_export_with_ganesha_conf_sectype(self
, export_conf
, expect_sectype
):
800 nfs_mod
= Module('nfs', '', '')
801 conf
= ExportMgr(nfs_mod
)
802 r
= conf
.apply_export(self
.cluster_id
, export_conf
)
805 # assert sectype matches new value(s)
806 info
= conf
._get
_export
_dict
(self
.cluster_id
, "/secure1")
807 assert info
["export_id"] == 1
808 assert info
["path"] == "/secure/me"
809 assert info
["sectype"] == expect_sectype
811 def test_update_export_with_list(self
):
812 self
._do
_mock
_test
(self
._do
_test
_update
_export
_with
_list
)
814 def _do_test_update_export_with_list(self
):
815 nfs_mod
= Module('nfs', '', '')
816 conf
= ExportMgr(nfs_mod
)
817 r
= conf
.apply_export(self
.cluster_id
, json
.dumps([
820 'pseudo': '/rgw/bucket',
821 'cluster_id': self
.cluster_id
,
824 'security_label': False,
826 'transports': ['TCP'],
828 'addresses': ["192.168.0.0/16"],
834 'user_id': 'nfs.foo.bucket',
835 'access_key_id': 'the_access_key',
836 'secret_access_key': 'the_secret_key',
841 'pseudo': '/rgw/bucket2',
842 'cluster_id': self
.cluster_id
,
845 'security_label': False,
847 'transports': ['TCP'],
849 'addresses': ["192.168.0.0/16"],
855 'user_id': 'nfs.foo.bucket2',
856 'access_key_id': 'the_access_key',
857 'secret_access_key': 'the_secret_key',
863 export
= conf
._fetch
_export
('foo', '/rgw/bucket')
864 assert export
.export_id
== 3
865 assert export
.path
== "bucket"
866 assert export
.pseudo
== "/rgw/bucket"
867 assert export
.access_type
== "RW"
868 assert export
.squash
== "root"
869 assert export
.protocols
== [4]
870 assert export
.transports
== ["TCP"]
871 assert export
.fsal
.name
== "RGW"
872 assert export
.fsal
.access_key_id
== "the_access_key"
873 assert export
.fsal
.secret_access_key
== "the_secret_key"
874 assert len(export
.clients
) == 1
875 assert export
.clients
[0].squash
is None
876 assert export
.clients
[0].access_type
is None
877 assert export
.cluster_id
== self
.cluster_id
879 export
= conf
._fetch
_export
('foo', '/rgw/bucket2')
880 assert export
.export_id
== 4
881 assert export
.path
== "bucket2"
882 assert export
.pseudo
== "/rgw/bucket2"
883 assert export
.access_type
== "RO"
884 assert export
.squash
== "root"
885 assert export
.protocols
== [4]
886 assert export
.transports
== ["TCP"]
887 assert export
.fsal
.name
== "RGW"
888 assert export
.fsal
.access_key_id
== "the_access_key"
889 assert export
.fsal
.secret_access_key
== "the_secret_key"
890 assert len(export
.clients
) == 1
891 assert export
.clients
[0].squash
is None
892 assert export
.clients
[0].access_type
is None
893 assert export
.cluster_id
== self
.cluster_id
895 def test_remove_export(self
) -> None:
896 self
._do
_mock
_test
(self
._do
_test
_remove
_export
)
898 def _do_test_remove_export(self
) -> None:
899 nfs_mod
= Module('nfs', '', '')
900 conf
= ExportMgr(nfs_mod
)
901 assert len(conf
.exports
[self
.cluster_id
]) == 2
902 assert conf
.delete_export(cluster_id
=self
.cluster_id
,
903 pseudo_path
="/rgw") == (0, "Successfully deleted export", "")
904 exports
= conf
.exports
[self
.cluster_id
]
905 assert len(exports
) == 1
906 assert exports
[0].export_id
== 1
908 def test_create_export_rgw_bucket(self
):
909 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_bucket
)
911 def _do_test_create_export_rgw_bucket(self
):
912 nfs_mod
= Module('nfs', '', '')
913 conf
= ExportMgr(nfs_mod
)
915 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
916 ls
= json
.loads(exports
[1])
919 r
= conf
.create_export(
921 cluster_id
=self
.cluster_id
,
923 pseudo_path
='/mybucket',
926 addr
=["192.168.0.0/16"]
930 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
931 ls
= json
.loads(exports
[1])
934 export
= conf
._fetch
_export
('foo', '/mybucket')
935 assert export
.export_id
936 assert export
.path
== "bucket"
937 assert export
.pseudo
== "/mybucket"
938 assert export
.access_type
== "none"
939 assert export
.squash
== "none"
940 assert export
.protocols
== [4]
941 assert export
.transports
== ["TCP"]
942 assert export
.fsal
.name
== "RGW"
943 assert export
.fsal
.user_id
== "bucket_owner_user"
944 assert export
.fsal
.access_key_id
== "the_access_key"
945 assert export
.fsal
.secret_access_key
== "the_secret_key"
946 assert len(export
.clients
) == 1
947 assert export
.clients
[0].squash
== 'root'
948 assert export
.clients
[0].access_type
== 'rw'
949 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
950 assert export
.cluster_id
== self
.cluster_id
952 def test_create_export_rgw_bucket_user(self
):
953 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_bucket
_user
)
955 def _do_test_create_export_rgw_bucket_user(self
):
956 nfs_mod
= Module('nfs', '', '')
957 conf
= ExportMgr(nfs_mod
)
959 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
960 ls
= json
.loads(exports
[1])
963 r
= conf
.create_export(
965 cluster_id
=self
.cluster_id
,
967 user_id
='other_user',
968 pseudo_path
='/mybucket',
971 addr
=["192.168.0.0/16"]
975 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
976 ls
= json
.loads(exports
[1])
979 export
= conf
._fetch
_export
('foo', '/mybucket')
980 assert export
.export_id
981 assert export
.path
== "bucket"
982 assert export
.pseudo
== "/mybucket"
983 assert export
.access_type
== "none"
984 assert export
.squash
== "none"
985 assert export
.protocols
== [4]
986 assert export
.transports
== ["TCP"]
987 assert export
.fsal
.name
== "RGW"
988 assert export
.fsal
.access_key_id
== "the_access_key"
989 assert export
.fsal
.secret_access_key
== "the_secret_key"
990 assert len(export
.clients
) == 1
991 assert export
.clients
[0].squash
== 'root'
992 assert export
.fsal
.user_id
== "other_user"
993 assert export
.clients
[0].access_type
== 'rw'
994 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
995 assert export
.cluster_id
== self
.cluster_id
997 def test_create_export_rgw_user(self
):
998 self
._do
_mock
_test
(self
._do
_test
_create
_export
_rgw
_user
)
1000 def _do_test_create_export_rgw_user(self
):
1001 nfs_mod
= Module('nfs', '', '')
1002 conf
= ExportMgr(nfs_mod
)
1004 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1005 ls
= json
.loads(exports
[1])
1008 r
= conf
.create_export(
1010 cluster_id
=self
.cluster_id
,
1011 user_id
='some_user',
1012 pseudo_path
='/mybucket',
1015 addr
=["192.168.0.0/16"]
1019 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1020 ls
= json
.loads(exports
[1])
1023 export
= conf
._fetch
_export
('foo', '/mybucket')
1024 assert export
.export_id
1025 assert export
.path
== "/"
1026 assert export
.pseudo
== "/mybucket"
1027 assert export
.access_type
== "none"
1028 assert export
.squash
== "none"
1029 assert export
.protocols
== [4]
1030 assert export
.transports
== ["TCP"]
1031 assert export
.fsal
.name
== "RGW"
1032 assert export
.fsal
.access_key_id
== "the_access_key"
1033 assert export
.fsal
.secret_access_key
== "the_secret_key"
1034 assert len(export
.clients
) == 1
1035 assert export
.clients
[0].squash
== 'root'
1036 assert export
.fsal
.user_id
== "some_user"
1037 assert export
.clients
[0].access_type
== 'rw'
1038 assert export
.clients
[0].addresses
== ["192.168.0.0/16"]
1039 assert export
.cluster_id
== self
.cluster_id
1041 def test_create_export_cephfs(self
):
1042 self
._do
_mock
_test
(self
._do
_test
_create
_export
_cephfs
)
1044 def _do_test_create_export_cephfs(self
):
1045 nfs_mod
= Module('nfs', '', '')
1046 conf
= ExportMgr(nfs_mod
)
1048 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1049 ls
= json
.loads(exports
[1])
1052 r
= conf
.create_export(
1054 cluster_id
=self
.cluster_id
,
1057 pseudo_path
='/cephfs2',
1060 addr
=["192.168.1.0/8"],
1064 exports
= conf
.list_exports(cluster_id
=self
.cluster_id
)
1065 ls
= json
.loads(exports
[1])
1068 export
= conf
._fetch
_export
('foo', '/cephfs2')
1069 assert export
.export_id
1070 assert export
.path
== "/"
1071 assert export
.pseudo
== "/cephfs2"
1072 assert export
.access_type
== "none"
1073 assert export
.squash
== "none"
1074 assert export
.protocols
== [4]
1075 assert export
.transports
== ["TCP"]
1076 assert export
.fsal
.name
== "CEPH"
1077 assert export
.fsal
.user_id
== "nfs.foo.3"
1078 assert export
.fsal
.cephx_key
== "thekeyforclientabc"
1079 assert len(export
.clients
) == 1
1080 assert export
.clients
[0].squash
== 'root'
1081 assert export
.clients
[0].access_type
== 'rw'
1082 assert export
.clients
[0].addresses
== ["192.168.1.0/8"]
1083 assert export
.cluster_id
== self
.cluster_id
1085 def _do_test_cluster_ls(self
):
1086 nfs_mod
= Module('nfs', '', '')
1087 cluster
= NFSCluster(nfs_mod
)
1089 rc
, out
, err
= cluster
.list_nfs_cluster()
1091 assert out
== self
.cluster_id
1093 def test_cluster_ls(self
):
1094 self
._do
_mock
_test
(self
._do
_test
_cluster
_ls
)
1096 def _do_test_cluster_info(self
):
1097 nfs_mod
= Module('nfs', '', '')
1098 cluster
= NFSCluster(nfs_mod
)
1100 rc
, out
, err
= cluster
.show_nfs_cluster_info(self
.cluster_id
)
1102 assert json
.loads(out
) == {"foo": {"virtual_ip": None, "backend": []}}
1104 def test_cluster_info(self
):
1105 self
._do
_mock
_test
(self
._do
_test
_cluster
_info
)
1107 def _do_test_cluster_config(self
):
1108 nfs_mod
= Module('nfs', '', '')
1109 cluster
= NFSCluster(nfs_mod
)
1111 rc
, out
, err
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1115 rc
, out
, err
= cluster
.set_nfs_cluster_config(self
.cluster_id
, '# foo\n')
1118 rc
, out
, err
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1120 assert out
== "# foo\n"
1122 rc
, out
, err
= cluster
.reset_nfs_cluster_config(self
.cluster_id
)
1125 rc
, out
, err
= cluster
.get_nfs_cluster_config(self
.cluster_id
)
1129 def test_cluster_config(self
):
1130 self
._do
_mock
_test
(self
._do
_test
_cluster
_config
)
1133 @pytest.mark
.parametrize(
1136 ("/foo/bar/baz", "/foo/bar/baz"),
1137 ("/foo/bar/baz/", "/foo/bar/baz"),
1138 ("/foo/bar/baz ", "/foo/bar/baz"),
1139 ("/foo/./bar/baz", "/foo/bar/baz"),
1140 ("/foo/bar/baz/..", "/foo/bar"),
1141 ("//foo/bar/baz", "/foo/bar/baz"),
1145 def test_normalize_path(path
, expected
):
1146 assert normalize_path(path
) == expected
1149 def test_ganesha_validate_squash():
1150 """Check error handling of internal validation function for squash value."""
1151 from nfs
.ganesha_conf
import _validate_squash
1152 from nfs
.exception
import NFSInvalidOperation
1154 _validate_squash("root")
1155 with pytest
.raises(NFSInvalidOperation
):
1156 _validate_squash("toot")
1159 def test_ganesha_validate_access_type():
1160 """Check error handling of internal validation function for access type value."""
1161 from nfs
.ganesha_conf
import _validate_access_type
1162 from nfs
.exception
import NFSInvalidOperation
1164 for ok
in ("rw", "ro", "none"):
1165 _validate_access_type(ok
)
1166 with pytest
.raises(NFSInvalidOperation
):
1167 _validate_access_type("any")