]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/dashboard/test_ganesha.py
bump version to 16.2.6-pve2
[ceph.git] / ceph / qa / tasks / mgr / dashboard / test_ganesha.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-public-methods
3
4 from __future__ import absolute_import
5
6 from .helper import DashboardTestCase, JList, JObj
7
8
9 class GaneshaTest(DashboardTestCase):
10 CEPHFS = True
11 AUTH_ROLES = ['pool-manager', 'ganesha-manager']
12
13 @classmethod
14 def setUpClass(cls):
15 super(GaneshaTest, cls).setUpClass()
16 cls.create_pool('ganesha', 2**2, 'replicated')
17 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node1'])
18 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node2'])
19 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha1', 'create', 'conf-node3'])
20 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node1'])
21 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node2'])
22 cls._rados_cmd(['-p', 'ganesha', '-N', 'ganesha2', 'create', 'conf-node3'])
23 cls._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
24 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
25
26 # RGW setup
27 cls._radosgw_admin_cmd([
28 'user', 'create', '--uid', 'admin', '--display-name', 'admin',
29 '--system', '--access-key', 'admin', '--secret', 'admin'
30 ])
31 cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-secret-key'], 'admin')
32 cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-access-key'], 'admin')
33
34 @classmethod
35 def tearDownClass(cls):
36 super(GaneshaTest, cls).tearDownClass()
37 cls._radosgw_admin_cmd(['user', 'rm', '--uid', 'admin', '--purge-data'])
38 cls._ceph_cmd(['osd', 'pool', 'delete', 'ganesha', 'ganesha',
39 '--yes-i-really-really-mean-it'])
40
41 @DashboardTestCase.RunAs('test', 'test', [{'rbd-image': ['create', 'update', 'delete']}])
42 def test_read_access_permissions(self):
43 self._get('/api/nfs-ganesha/export')
44 self.assertStatus(403)
45
46 def test_list_daemons(self):
47 daemons = self._get("/api/nfs-ganesha/daemon")
48 self.assertEqual(len(daemons), 6)
49 daemons = [(d['daemon_id'], d['cluster_id']) for d in daemons]
50 self.assertIn(('node1', 'cluster1'), daemons)
51 self.assertIn(('node2', 'cluster1'), daemons)
52 self.assertIn(('node3', 'cluster1'), daemons)
53 self.assertIn(('node1', 'cluster2'), daemons)
54 self.assertIn(('node2', 'cluster2'), daemons)
55 self.assertIn(('node3', 'cluster2'), daemons)
56
57 @classmethod
58 def create_export(cls, path, cluster_id, daemons, fsal, sec_label_xattr=None):
59 if fsal == 'CEPH':
60 fsal = {"name": "CEPH", "user_id": "admin", "fs_name": None,
61 "sec_label_xattr": sec_label_xattr}
62 pseudo = "/cephfs{}".format(path)
63 else:
64 fsal = {"name": "RGW", "rgw_user_id": "admin"}
65 pseudo = "/rgw/{}".format(path if path[0] != '/' else "")
66 ex_json = {
67 "path": path,
68 "fsal": fsal,
69 "cluster_id": cluster_id,
70 "daemons": daemons,
71 "pseudo": pseudo,
72 "tag": None,
73 "access_type": "RW",
74 "squash": "no_root_squash",
75 "security_label": sec_label_xattr is not None,
76 "protocols": [4],
77 "transports": ["TCP"],
78 "clients": [{
79 "addresses": ["10.0.0.0/8"],
80 "access_type": "RO",
81 "squash": "root"
82 }]
83 }
84 return cls._task_post('/api/nfs-ganesha/export', ex_json)
85
86 def tearDown(self):
87 super(GaneshaTest, self).tearDown()
88 exports = self._get("/api/nfs-ganesha/export")
89 if self._resp.status_code != 200:
90 return
91 self.assertIsInstance(exports, list)
92 for exp in exports:
93 self._task_delete("/api/nfs-ganesha/export/{}/{}"
94 .format(exp['cluster_id'], exp['export_id']))
95
96 def _test_create_export(self, cephfs_path):
97 exports = self._get("/api/nfs-ganesha/export")
98 self.assertEqual(len(exports), 0)
99
100 data = self.create_export(cephfs_path, 'cluster1', ['node1', 'node2'], 'CEPH',
101 "security.selinux")
102
103 exports = self._get("/api/nfs-ganesha/export")
104 self.assertEqual(len(exports), 1)
105 self.assertDictEqual(exports[0], data)
106 return data
107
108 def test_create_export(self):
109 self._test_create_export('/foo')
110
111 def test_create_export_for_cephfs_root(self):
112 self._test_create_export('/')
113
114 def test_update_export(self):
115 export = self._test_create_export('/foo')
116 export['access_type'] = 'RO'
117 export['daemons'] = ['node1', 'node3']
118 export['security_label'] = True
119 data = self._task_put('/api/nfs-ganesha/export/{}/{}'
120 .format(export['cluster_id'], export['export_id']),
121 export)
122 exports = self._get("/api/nfs-ganesha/export")
123 self.assertEqual(len(exports), 1)
124 self.assertDictEqual(exports[0], data)
125 self.assertEqual(exports[0]['daemons'], ['node1', 'node3'])
126 self.assertEqual(exports[0]['security_label'], True)
127
128 def test_delete_export(self):
129 export = self._test_create_export('/foo')
130 self._task_delete("/api/nfs-ganesha/export/{}/{}"
131 .format(export['cluster_id'], export['export_id']))
132 self.assertStatus(204)
133
134 def test_get_export(self):
135 exports = self._get("/api/nfs-ganesha/export")
136 self.assertEqual(len(exports), 0)
137
138 data1 = self.create_export("/foo", 'cluster2', ['node1', 'node2'], 'CEPH')
139 data2 = self.create_export("mybucket", 'cluster2', ['node2', 'node3'], 'RGW')
140
141 export1 = self._get("/api/nfs-ganesha/export/cluster2/1")
142 self.assertDictEqual(export1, data1)
143
144 export2 = self._get("/api/nfs-ganesha/export/cluster2/2")
145 self.assertDictEqual(export2, data2)
146
147 def test_invalid_status(self):
148 self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', ''])
149
150 data = self._get('/api/nfs-ganesha/status')
151 self.assertStatus(200)
152 self.assertIn('available', data)
153 self.assertIn('message', data)
154 self.assertFalse(data['available'])
155 self.assertIn(("NFS-Ganesha cluster is not detected. "
156 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
157 "setting or deploy an NFS-Ganesha cluster with the Orchestrator."),
158 data['message'])
159
160 self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace',
161 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
162
163 def test_valid_status(self):
164 data = self._get('/api/nfs-ganesha/status')
165 self.assertStatus(200)
166 self.assertIn('available', data)
167 self.assertIn('message', data)
168 self.assertTrue(data['available'])
169
170 def test_ganesha_fsals(self):
171 data = self._get('/ui-api/nfs-ganesha/fsals')
172 self.assertStatus(200)
173 self.assertIn('CEPH', data)
174
175 def test_ganesha_filesystems(self):
176 data = self._get('/ui-api/nfs-ganesha/cephfs/filesystems')
177 self.assertStatus(200)
178 self.assertSchema(data, JList(JObj({
179 'id': int,
180 'name': str
181 })))
182
183 def test_ganesha_lsdir(self):
184 fss = self._get('/ui-api/nfs-ganesha/cephfs/filesystems')
185 self.assertStatus(200)
186 for fs in fss:
187 data = self._get('/ui-api/nfs-ganesha/lsdir/{}'.format(fs['name']))
188 self.assertStatus(200)
189 self.assertSchema(data, JObj({'paths': JList(str)}))
190 self.assertEqual(data['paths'][0], '/')
191
192 def test_ganesha_buckets(self):
193 data = self._get('/ui-api/nfs-ganesha/rgw/buckets')
194 self.assertStatus(200)
195 schema = JList(str)
196 self.assertSchema(data, schema)
197
198 def test_ganesha_clusters(self):
199 data = self._get('/ui-api/nfs-ganesha/clusters')
200 self.assertStatus(200)
201 schema = JList(str)
202 self.assertSchema(data, schema)
203
204 def test_ganesha_cephx_clients(self):
205 data = self._get('/ui-api/nfs-ganesha/cephx/clients')
206 self.assertStatus(200)
207 schema = JList(str)
208 self.assertSchema(data, schema)