]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/dashboard/test_ganesha.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-public-methods
4 from __future__
import absolute_import
6 from .helper
import DashboardTestCase
, JList
, JObj
9 class GaneshaTest(DashboardTestCase
):
11 AUTH_ROLES
= ['pool-manager', 'ganesha-manager']
15 super(GaneshaTest
, cls
).setUpClass()
16 cls
.create_pool('ganesha', 2**2, 'replicated')
17 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node1'])
18 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node2'])
19 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node3'])
20 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node1'])
21 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node2'])
22 cls
._rados
_cmd
(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node3'])
23 cls
._ceph
_cmd
(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
24 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
27 cls
._radosgw
_admin
_cmd
([
28 'user', 'create', '--uid', 'admin', '--display-name', 'admin',
29 '--system', '--access-key', 'admin', '--secret', 'admin'
31 cls
._ceph
_cmd
_with
_secret
(['dashboard', 'set-rgw-api-secret-key'], 'admin')
32 cls
._ceph
_cmd
_with
_secret
(['dashboard', 'set-rgw-api-access-key'], 'admin')
35 def tearDownClass(cls
):
36 super(GaneshaTest
, cls
).tearDownClass()
37 cls
._radosgw
_admin
_cmd
(['user', 'rm', '--uid', 'admin', '--purge-data'])
38 cls
._ceph
_cmd
(['osd', 'pool', 'delete', 'ganesha', 'ganesha',
39 '--yes-i-really-really-mean-it'])
41 @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
42 def test_read_access_permissions(self
):
43 self
._get
('/api/nfs-ganesha/export')
44 self
.assertStatus(403)
46 def test_list_daemons(self
):
47 daemons
= self
._get
("/api/nfs-ganesha/daemon")
48 self
.assertEqual(len(daemons
), 6)
49 daemons
= [(d
['daemon_id'], d
['cluster_id']) for d
in daemons
]
50 self
.assertIn(('node1', 'cluster1'), daemons
)
51 self
.assertIn(('node2', 'cluster1'), daemons
)
52 self
.assertIn(('node3', 'cluster1'), daemons
)
53 self
.assertIn(('node1', 'cluster2'), daemons
)
54 self
.assertIn(('node2', 'cluster2'), daemons
)
55 self
.assertIn(('node3', 'cluster2'), daemons
)
58 def create_export(cls
, path
, cluster_id
, daemons
, fsal
, sec_label_xattr
=None):
60 fsal
= {"name": "CEPH", "user_id": "admin", "fs_name": None,
61 "sec_label_xattr": sec_label_xattr
}
62 pseudo
= "/cephfs{}".format(path
)
64 fsal
= {"name": "RGW", "rgw_user_id": "admin"}
65 pseudo
= "/rgw/{}".format(path
if path
[0] != '/' else "")
69 "cluster_id": cluster_id
,
74 "squash": "no_root_squash",
75 "security_label": sec_label_xattr
is not None,
77 "transports": ["TCP"],
79 "addresses": ["10.0.0.0/8"],
84 return cls
._task
_post
('/api/nfs-ganesha/export', ex_json
)
87 super(GaneshaTest
, self
).tearDown()
88 exports
= self
._get
("/api/nfs-ganesha/export")
89 if self
._resp
.status_code
!= 200:
91 self
.assertIsInstance(exports
, list)
93 self
._task
_delete
("/api/nfs-ganesha/export/{}/{}"
94 .format(exp
['cluster_id'], exp
['export_id']))
96 def _test_create_export(self
, cephfs_path
):
97 exports
= self
._get
("/api/nfs-ganesha/export")
98 self
.assertEqual(len(exports
), 0)
100 data
= self
.create_export(cephfs_path
, 'cluster1', ['node1', 'node2'], 'CEPH',
103 exports
= self
._get
("/api/nfs-ganesha/export")
104 self
.assertEqual(len(exports
), 1)
105 self
.assertDictEqual(exports
[0], data
)
108 def test_create_export(self
):
109 self
._test
_create
_export
('/foo')
111 def test_create_export_for_cephfs_root(self
):
112 self
._test
_create
_export
('/')
114 def test_update_export(self
):
115 export
= self
._test
_create
_export
('/foo')
116 export
['access_type'] = 'RO'
117 export
['daemons'] = ['node1', 'node3']
118 export
['security_label'] = True
119 data
= self
._task
_put
('/api/nfs-ganesha/export/{}/{}'
120 .format(export
['cluster_id'], export
['export_id']),
122 exports
= self
._get
("/api/nfs-ganesha/export")
123 self
.assertEqual(len(exports
), 1)
124 self
.assertDictEqual(exports
[0], data
)
125 self
.assertEqual(exports
[0]['daemons'], ['node1', 'node3'])
126 self
.assertEqual(exports
[0]['security_label'], True)
128 def test_delete_export(self
):
129 export
= self
._test
_create
_export
('/foo')
130 self
._task
_delete
("/api/nfs-ganesha/export/{}/{}"
131 .format(export
['cluster_id'], export
['export_id']))
132 self
.assertStatus(204)
134 def test_get_export(self
):
135 exports
= self
._get
("/api/nfs-ganesha/export")
136 self
.assertEqual(len(exports
), 0)
138 data1
= self
.create_export("/foo", 'cluster2', ['node1', 'node2'], 'CEPH')
139 data2
= self
.create_export("mybucket", 'cluster2', ['node2', 'node3'], 'RGW')
141 export1
= self
._get
("/api/nfs-ganesha/export/cluster2/1")
142 self
.assertDictEqual(export1
, data1
)
144 export2
= self
._get
("/api/nfs-ganesha/export/cluster2/2")
145 self
.assertDictEqual(export2
, data2
)
147 def test_invalid_status(self
):
148 self
._ceph
_cmd
(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', ''])
150 data
= self
._get
('/api/nfs-ganesha/status')
151 self
.assertStatus(200)
152 self
.assertIn('available', data
)
153 self
.assertIn('message', data
)
154 self
.assertFalse(data
['available'])
155 self
.assertIn(("NFS-Ganesha cluster is not detected. "
156 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
157 "setting or deploy an NFS-Ganesha cluster with the Orchestrator."),
160 self
._ceph
_cmd
(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
161 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
163 def test_valid_status(self
):
164 data
= self
._get
('/api/nfs-ganesha/status')
165 self
.assertStatus(200)
166 self
.assertIn('available', data
)
167 self
.assertIn('message', data
)
168 self
.assertTrue(data
['available'])
170 def test_ganesha_fsals(self
):
171 data
= self
._get
('/ui-api/nfs-ganesha/fsals')
172 self
.assertStatus(200)
173 self
.assertIn('CEPH', data
)
175 def test_ganesha_filesystems(self
):
176 data
= self
._get
('/ui-api/nfs-ganesha/cephfs/filesystems')
177 self
.assertStatus(200)
178 self
.assertSchema(data
, JList(JObj({
183 def test_ganesha_lsdir(self
):
184 fss
= self
._get
('/ui-api/nfs-ganesha/cephfs/filesystems')
185 self
.assertStatus(200)
187 data
= self
._get
('/ui-api/nfs-ganesha/lsdir/{}'.format(fs
['name']))
188 self
.assertStatus(200)
189 self
.assertSchema(data
, JObj({'paths': JList(str)}))
190 self
.assertEqual(data
['paths'][0], '/')
192 def test_ganesha_buckets(self
):
193 data
= self
._get
('/ui-api/nfs-ganesha/rgw/buckets')
194 self
.assertStatus(200)
196 self
.assertSchema(data
, schema
)
198 def test_ganesha_clusters(self
):
199 data
= self
._get
('/ui-api/nfs-ganesha/clusters')
200 self
.assertStatus(200)
202 self
.assertSchema(data
, schema
)
204 def test_ganesha_cephx_clients(self
):
205 data
= self
._get
('/ui-api/nfs-ganesha/cephx/clients')
206 self
.assertStatus(200)
208 self
.assertSchema(data
, schema
)