]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/caps_helper.py
f3bc14b074205e0dbbb7be2fb63e40d57fd32627
2 Helper methods to test that MON and MDS caps are enforced properly.
4 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
6 from teuthology
.orchestra
.run
import Raw
8 class CapsHelper(CephFSTestCase
):
10 def run_mon_cap_tests(self
, moncap
, keyring
):
11 keyring_path
= self
.fs
.admin_remote
.mktemp(data
=keyring
)
13 fsls
= self
.run_cluster_cmd(f
'fs ls --id {self.client_id} -k '
16 # we need to check only for default FS when fsname clause is absent
18 if 'fsname' not in moncap
:
19 self
.assertIn(self
.fs
.name
, fsls
)
22 fss
= (self
.fs1
.name
, self
.fs2
.name
) if hasattr(self
, 'fs1') else \
26 self
.assertIn('name: ' + fsname
, fsls
)
28 self
.assertNotIn('name: ' + fsname
, fsls
)
30 def run_mds_cap_tests(self
, filepaths
, filedata
, mounts
, perm
):
31 self
.conduct_pos_test_for_read_caps(filepaths
, filedata
, mounts
)
34 self
.conduct_pos_test_for_write_caps(filepaths
, mounts
)
36 self
.conduct_neg_test_for_write_caps(filepaths
, mounts
)
38 raise RuntimeError(f
'perm = {perm}\nIt should be "r" or "rw".')
40 def conduct_pos_test_for_read_caps(self
, filepaths
, filedata
, mounts
):
42 for path
, data
in zip(filepaths
, filedata
):
43 # XXX: conduct tests only if path belongs to current mount; in
44 # teuth tests client are located on same machines.
45 if path
.find(mount
.hostfs_mntpt
) != -1:
46 contents
= mount
.read_file(path
)
47 self
.assertEqual(data
, contents
)
49 def conduct_pos_test_for_write_caps(self
, filepaths
, mounts
):
50 filedata
= ('some new data on first fs', 'some new data on second fs')
53 for path
, data
in zip(filepaths
, filedata
):
54 if path
.find(mount
.hostfs_mntpt
) != -1:
55 # test that write was successful
56 mount
.write_file(path
=path
, data
=data
)
57 # verify that contents written was same as the one that was
59 contents1
= mount
.read_file(path
=path
)
60 self
.assertEqual(data
, contents1
)
62 def conduct_neg_test_for_write_caps(self
, filepaths
, mounts
):
63 cmdargs
= ['echo', 'some random data', Raw('|'), 'tee']
66 for path
in filepaths
:
67 if path
.find(mount
.hostfs_mntpt
) != -1:
69 mount
.negtestcmd(args
=cmdargs
, retval
=1,
70 errmsg
='permission denied')
72 def get_mon_cap_from_keyring(self
, client_name
):
73 keyring
= self
.run_cluster_cmd(cmd
=f
'auth get {client_name}')
74 for line
in keyring
.split('\n'):
75 if 'caps mon' in line
:
76 return line
[line
.find(' = "') + 4 : -1]
78 raise RuntimeError('get_save_mon_cap: mon cap not found in keyring. '
79 'keyring -\n' + keyring
)