]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | """ |
2 | Helper methods to test that MON and MDS caps are enforced properly. | |
3 | """ | |
4 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
5 | ||
6 | from teuthology.orchestra.run import Raw | |
7 | ||
8 | class CapsHelper(CephFSTestCase): | |
9 | ||
10 | def run_mon_cap_tests(self, moncap, keyring): | |
11 | keyring_path = self.create_keyring_file(self.fs.admin_remote, keyring) | |
12 | ||
13 | fsls = self.run_cluster_cmd(f'fs ls --id {self.client_id} -k ' | |
14 | f'{keyring_path}') | |
15 | ||
16 | # we need to check only for default FS when fsname clause is absent | |
17 | # in MON/MDS caps | |
18 | if 'fsname' not in moncap: | |
19 | self.assertIn(self.fs.name, fsls) | |
20 | return | |
21 | ||
22 | fss = (self.fs1.name, self.fs2.name) if hasattr(self, 'fs1') else \ | |
23 | (self.fs.name,) | |
24 | for fsname in fss: | |
25 | if fsname in moncap: | |
26 | self.assertIn('name: ' + fsname, fsls) | |
27 | else: | |
28 | self.assertNotIn('name: ' + fsname, fsls) | |
29 | ||
30 | def run_mds_cap_tests(self, filepaths, filedata, mounts, perm): | |
31 | self.conduct_pos_test_for_read_caps(filepaths, filedata, mounts) | |
32 | ||
33 | if perm == 'rw': | |
34 | self.conduct_pos_test_for_write_caps(filepaths, mounts) | |
35 | elif perm == 'r': | |
36 | self.conduct_neg_test_for_write_caps(filepaths, mounts) | |
37 | else: | |
38 | raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".') | |
39 | ||
40 | def conduct_pos_test_for_read_caps(self, filepaths, filedata, mounts): | |
41 | for mount in mounts: | |
42 | for path, data in zip(filepaths, filedata): | |
43 | # XXX: conduct tests only if path belongs to current mount; in | |
44 | # teuth tests client are located on same machines. | |
45 | if path.find(mount.hostfs_mntpt) != -1: | |
46 | contents = mount.read_file(path) | |
47 | self.assertEqual(data, contents) | |
48 | ||
49 | def conduct_pos_test_for_write_caps(self, filepaths, mounts): | |
50 | filedata = ('some new data on first fs', 'some new data on second fs') | |
51 | ||
52 | for mount in mounts: | |
53 | for path, data in zip(filepaths, filedata): | |
54 | if path.find(mount.hostfs_mntpt) != -1: | |
55 | # test that write was successful | |
56 | mount.write_file(path=path, data=data) | |
57 | # verify that contents written was same as the one that was | |
58 | # intended | |
59 | contents1 = mount.read_file(path=path) | |
60 | self.assertEqual(data, contents1) | |
61 | ||
62 | def conduct_neg_test_for_write_caps(self, filepaths, mounts): | |
63 | cmdargs = ['echo', 'some random data', Raw('|'), 'sudo', 'tee'] | |
64 | ||
65 | for mount in mounts: | |
66 | for path in filepaths: | |
67 | if path.find(mount.hostfs_mntpt) != -1: | |
68 | cmdargs.append(path) | |
69 | mount.negtestcmd(args=cmdargs, retval=1, | |
70 | errmsg='permission denied') | |
71 | ||
72 | def get_mon_cap_from_keyring(self, client_name): | |
73 | keyring = self.run_cluster_cmd(cmd=f'auth get {client_name}') | |
74 | for line in keyring.split('\n'): | |
75 | if 'caps mon' in line: | |
76 | return line[line.find(' = "') + 4 : -1] | |
77 | ||
78 | raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. ' | |
79 | 'keyring -\n' + keyring) |