]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/caps_helper.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / cephfs / caps_helper.py
CommitLineData
f67539c2
TL
1"""
2Helper methods to test that MON and MDS caps are enforced properly.
3"""
4from tasks.cephfs.cephfs_test_case import CephFSTestCase
5
6from teuthology.orchestra.run import Raw
7
8class CapsHelper(CephFSTestCase):
9
10 def run_mon_cap_tests(self, moncap, keyring):
11 keyring_path = self.create_keyring_file(self.fs.admin_remote, keyring)
12
13 fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k '
14 f'{keyring_path}')
15
16 # we need to check only for default FS when fsname clause is absent
17 # in MON/MDS caps
18 if 'fsname' not in moncap:
19 self.assertIn(self.fs.name, fsls)
20 return
21
22 fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \
23 (self.fs.name,)
24 for fsname in fss:
25 if fsname in moncap:
26 self.assertIn('name: ' + fsname, fsls)
27 else:
28 self.assertNotIn('name: ' + fsname, fsls)
29
30 def run_mds_cap_tests(self, filepaths, filedata, mounts, perm):
31 self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts)
32
33 if perm == 'rw':
34 self.conduct_pos_test_for_write_caps(filepaths, mounts)
35 elif perm == 'r':
36 self.conduct_neg_test_for_write_caps(filepaths, mounts)
37 else:
38 raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
39
40 def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts):
41 for mount in mounts:
42 for path, data in zip(filepaths, filedata):
43 # XXX: conduct tests only if path belongs to current mount; in
44 # teuth tests client are located on same machines.
45 if path.find(mount.hostfs_mntpt) != -1:
46 contents = mount.read_file(path)
47 self.assertEqual(data, contents)
48
49 def conduct_pos_test_for_write_caps(self, filepaths, mounts):
50 filedata = ('some new data on first fs', 'some new data on second fs')
51
52 for mount in mounts:
53 for path, data in zip(filepaths, filedata):
54 if path.find(mount.hostfs_mntpt) != -1:
55 # test that write was successful
56 mount.write_file(path=path, data=data)
57 # verify that contents written was same as the one that was
58 # intended
59 contents1 = mount.read_file(path=path)
60 self.assertEqual(data, contents1)
61
62 def conduct_neg_test_for_write_caps(self, filepaths, mounts):
63 cmdargs = ['echo', 'some random data', Raw('|'), 'sudo', 'tee']
64
65 for mount in mounts:
66 for path in filepaths:
67 if path.find(mount.hostfs_mntpt) != -1:
68 cmdargs.append(path)
69 mount.negtestcmd(args=cmdargs, retval=1,
70 errmsg='permission denied')
71
72 def get_mon_cap_from_keyring(self, client_name):
73 keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}')
74 for line in keyring.split('\n'):
75 if 'caps mon' in line:
76 return line[line.find(' = "') + 4 : -1]
77
78 raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
79 'keyring -\n' + keyring)