from __future__ import absolute_import
-from .helper import DashboardTestCase
+from .helper import DashboardTestCase, JList, JObj
class GaneshaTest(DashboardTestCase):
'user', 'create', '--uid', 'admin', '--display-name', 'admin',
'--system', '--access-key', 'admin', '--secret', 'admin'
])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-secret-key', 'admin'])
- cls._ceph_cmd(['dashboard', 'set-rgw-api-access-key', 'admin'])
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-secret-key'], 'admin')
+ cls._ceph_cmd_with_secret(['dashboard', 'set-rgw-api-access-key'], 'admin')
@classmethod
def tearDownClass(cls):
@classmethod
def create_export(cls, path, cluster_id, daemons, fsal, sec_label_xattr=None):
if fsal == 'CEPH':
- fsal = {"name": "CEPH", "user_id":"admin", "fs_name": None, "sec_label_xattr": sec_label_xattr}
+ fsal = {"name": "CEPH", "user_id": "admin", "fs_name": None, "sec_label_xattr": sec_label_xattr}
pseudo = "/cephfs{}".format(path)
else:
fsal = {"name": "RGW", "rgw_user_id": "admin"}
"protocols": [4],
"transports": ["TCP"],
"clients": [{
- "addresses":["10.0.0.0/8"],
+ "addresses": ["10.0.0.0/8"],
"access_type": "RO",
"squash": "root"
}]
self._task_delete("/api/nfs-ganesha/export/{}/{}"
.format(exp['cluster_id'], exp['export_id']))
- def test_create_export(self):
+ def _test_create_export(self, cephfs_path):
exports = self._get("/api/nfs-ganesha/export")
self.assertEqual(len(exports), 0)
- data = self.create_export("/foo", 'cluster1', ['node1', 'node2'], 'CEPH', "security.selinux")
+ data = self.create_export(cephfs_path, 'cluster1', ['node1', 'node2'], 'CEPH', "security.selinux")
exports = self._get("/api/nfs-ganesha/export")
self.assertEqual(len(exports), 1)
self.assertDictEqual(exports[0], data)
return data
+ def test_create_export(self):
+ self._test_create_export('/foo')
+
+ def test_create_export_for_cephfs_root(self):
+ self._test_create_export('/')
+
def test_update_export(self):
- export = self.test_create_export()
+ export = self._test_create_export('/foo')
export['access_type'] = 'RO'
export['daemons'] = ['node1', 'node3']
export['security_label'] = True
self.assertEqual(exports[0]['security_label'], True)
def test_delete_export(self):
- export = self.test_create_export()
+ export = self._test_create_export('/foo')
self._task_delete("/api/nfs-ganesha/export/{}/{}"
.format(export['cluster_id'], export['export_id']))
self.assertStatus(204)
self.assertIn('available', data)
self.assertIn('message', data)
self.assertFalse(data['available'])
- self.assertIn('Ganesha config location is not configured. Please set the GANESHA_RADOS_POOL_NAMESPACE setting.',
+ self.assertIn(("NFS-Ganesha cluster is not detected. "
+ "Please set the GANESHA_RADOS_POOL_NAMESPACE "
+ "setting or deploy an NFS-Ganesha cluster with the Orchestrator."),
data['message'])
self._ceph_cmd(['dashboard', 'set-ganesha-clusters-rados-pool-namespace', 'cluster1:ganesha/ganesha1,cluster2:ganesha/ganesha2'])
self.assertIn('available', data)
self.assertIn('message', data)
self.assertTrue(data['available'])
+
+ def test_ganesha_fsals(self):
+ data = self._get('/ui-api/nfs-ganesha/fsals')
+ self.assertStatus(200)
+ self.assertIn('CEPH', data)
+
+ def test_ganesha_filesystems(self):
+ data = self._get('/ui-api/nfs-ganesha/cephfs/filesystems')
+ self.assertStatus(200)
+ self.assertSchema(data, JList(JObj({
+ 'id': int,
+ 'name': str
+ })))
+
+ def test_ganesha_lsdir(self):
+ fss = self._get('/ui-api/nfs-ganesha/cephfs/filesystems')
+ self.assertStatus(200)
+ for fs in fss:
+ data = self._get('/ui-api/nfs-ganesha/lsdir/{}'.format(fs['name']))
+ self.assertStatus(200)
+ self.assertSchema(data, JObj({'paths': JList(str)}))
+ self.assertEqual(data['paths'][0], '/')
+
+ def test_ganesha_buckets(self):
+ data = self._get('/ui-api/nfs-ganesha/rgw/buckets')
+ self.assertStatus(200)
+ schema = JList(str)
+ self.assertSchema(data, schema)
+
+ def test_ganesha_clusters(self):
+ data = self._get('/ui-api/nfs-ganesha/clusters')
+ self.assertStatus(200)
+ schema = JList(str)
+ self.assertSchema(data, schema)
+
+ def test_ganesha_cephx_clients(self):
+ data = self._get('/ui-api/nfs-ganesha/cephx/clients')
+ self.assertStatus(200)
+ schema = JList(str)
+ self.assertSchema(data, schema)