]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/caps_helper.py
update ceph source to reef 18.2.0
[ceph.git] / ceph / qa / tasks / cephfs / caps_helper.py
CommitLineData
f67539c2
TL
1"""
2Helper methods to test that MON and MDS caps are enforced properly.
3"""
1e59de90
TL
4from os.path import join as os_path_join
5from logging import getLogger
6
f67539c2
TL
7from tasks.cephfs.cephfs_test_case import CephFSTestCase
8
9from teuthology.orchestra.run import Raw
10
f67539c2 11
1e59de90
TL
12log = getLogger(__name__)
13
14
15class CapTester(CephFSTestCase):
16 """
17 Test that MON and MDS caps are enforced.
18
19 MDS caps are tested by exercising read-write permissions and MON caps are
20 tested using output of command "ceph fs ls". Besides, it provides
21 write_test_files() which creates test files at the given path on CephFS
22 mounts passed to it.
23
24 USAGE: Call write_test_files() method at the beginning of the test and
25 once the caps that needs to be tested are assigned to the client and
26 CephFS be remount for caps to effective, call run_cap_tests(),
27 run_mon_cap_tests() or run_mds_cap_tests() as per the need.
28 """
29
30 def write_test_files(self, mounts, testpath=''):
31 """
32 Exercising 'r' and 'w' access levels on a file on CephFS mount is
33 pretty routine across all tests for caps. Adding to method to write
34 that file will reduce clutter in these tests.
35
36 This methods writes a fixed data in a file with a fixed name located
37 at the path passed in testpath for the given list of mounts. If
38 testpath is empty, the file is created at the root of the CephFS.
39 """
40 dirname, filename = 'testdir', 'testfile'
41 self.test_set = []
42 # XXX: The reason behind testpath[1:] below is that the testpath is
43 # supposed to contain a path inside CephFS (which might be passed as
44 # an absolute path). os.path.join() deletes all previous path
45 # components when it encounters a path component starting with '/'.
46 # Deleting the first '/' from the string in testpath ensures that
47 # previous path components are not deleted by os.path.join().
48 if testpath:
49 testpath = testpath[1:] if testpath[0] == '/' else testpath
50 # XXX: passing just '/' screw up os.path.join() ahead.
51 if testpath == '/':
52 testpath = ''
53
54 for mount_x in mounts:
55 log.info(f'creating test file on FS {mount_x.cephfs_name} '
56 f'mounted at {mount_x.mountpoint}...')
57 dirpath = os_path_join(mount_x.hostfs_mntpt, testpath, dirname)
58 mount_x.run_shell(f'mkdir {dirpath}')
59 filepath = os_path_join(dirpath, filename)
60 # XXX: the reason behind adding filepathm, cephfs_name and both
61 # mntpts is to avoid a test bug where we mount cephfs1 but what
62 # ends up being mounted cephfs2. since filepath and filedata are
63 # identical, how would tests figure otherwise that they are
64 # accessing the right filename but on wrong CephFS.
65 filedata = (f'filepath = {filepath}\n'
66 f'cephfs_name = {mount_x.cephfs_name}\n'
67 f'cephfs_mntpt = {mount_x.cephfs_mntpt}\n'
68 f'hostfs_mntpt = {mount_x.hostfs_mntpt}')
69 mount_x.write_file(filepath, filedata)
70 self.test_set.append((mount_x, filepath, filedata))
71 log.info('test file created at {path} with data "{data}.')
72
73 def run_cap_tests(self, perm, mntpt=None):
74 # TODO
75 #self.run_mon_cap_tests()
76 self.run_mds_cap_tests(perm, mntpt=mntpt)
77
78 def _get_fsnames_from_moncap(self, moncap):
79 fsnames = []
80 while moncap.find('fsname=') != -1:
81 fsname_first_char = moncap.index('fsname=') + len('fsname=')
82
83 if ',' in moncap:
84 last = moncap.index(',')
85 fsname = moncap[fsname_first_char : last]
86 moncap = moncap.replace(moncap[0 : last+1], '')
87 else:
88 fsname = moncap[fsname_first_char : ]
89 moncap = moncap.replace(moncap[0 : ], '')
90
91 fsnames.append(fsname)
92
93 return fsnames
94
95 def run_mon_cap_tests(self, def_fs, client_id):
96 """
97 Check that MON cap is enforced for a client by searching for a Ceph
98 FS name in output of cmd "fs ls" executed with that client's caps.
f67539c2 99
1e59de90
TL
100 def_fs stands for default FS on Ceph cluster.
101 """
102 get_cluster_cmd_op = def_fs.mon_manager.raw_cluster_cmd
f67539c2 103
1e59de90
TL
104 keyring = get_cluster_cmd_op(args=f'auth get client.{client_id}')
105
106 moncap = None
107 for line in keyring.split('\n'):
108 if 'caps mon' in line:
109 moncap = line[line.find(' = "') + 4 : -1]
110 break
111 else:
112 raise RuntimeError('run_mon_cap_tests(): mon cap not found in '
113 'keyring. keyring -\n' + keyring)
114
115 keyring_path = def_fs.admin_remote.mktemp(data=keyring)
116
117 fsls = get_cluster_cmd_op(
118 args=f'fs ls --id {client_id} -k {keyring_path}')
119 log.info(f'output of fs ls cmd run by client.{client_id} -\n{fsls}')
120
121 if 'fsname=' not in moncap:
122 log.info('no FS name is mentioned in moncap, client has '
123 'permission to list all files. moncap -\n{moncap}')
124 log.info('testing for presence of all FS names in output of '
125 '"fs ls" command run by client.')
126
127 fsls_admin = get_cluster_cmd_op(args='fs ls')
128 log.info('output of fs ls cmd run by admin -\n{fsls_admin}')
129
130 self.assertEqual(fsls, fsls_admin)
f67539c2
TL
131 return
132
1e59de90
TL
133 log.info('FS names are mentioned in moncap. moncap -\n{moncap}')
134 log.info('testing for presence of these FS names in output of '
135 '"fs ls" command run by client.')
136 for fsname in self._get_fsnames_from_moncap(moncap):
137 self.assertIn('name: ' + fsname, fsls)
f67539c2 138
1e59de90
TL
139 def run_mds_cap_tests(self, perm, mntpt=None):
140 """
141 Run test for read perm and, for write perm, run positive test if it
142 is present and run negative test if not.
143 """
144 # XXX: mntpt is path inside cephfs that serves as root for current
145 # mount. Therefore, this path must me deleted from self.filepaths.
146 # Example -
147 # orignal path: /mnt/cephfs_x/dir1/dir2/testdir
148 # cephfs dir serving as root for current mnt: /dir1/dir2
149 # therefore, final path: /mnt/cephfs_x//testdir
150 if mntpt:
151 self.test_set = [(x, y.replace(mntpt, ''), z) for x, y, z in \
152 self.test_set]
153
154 self.conduct_pos_test_for_read_caps()
f67539c2
TL
155
156 if perm == 'rw':
1e59de90 157 self.conduct_pos_test_for_write_caps()
f67539c2 158 elif perm == 'r':
1e59de90 159 self.conduct_neg_test_for_write_caps()
f67539c2
TL
160 else:
161 raise RuntimeError(f'perm = {perm}\nIt should be "r" or "rw".')
162
1e59de90
TL
163 def conduct_pos_test_for_read_caps(self):
164 for mount, path, data in self.test_set:
165 log.info(f'test read perm: read file {path} and expect data '
166 f'"{data}"')
167 contents = mount.read_file(path)
168 self.assertEqual(data, contents)
169 log.info(f'read perm was tested successfully: "{data}" was '
170 f'successfully read from path {path}')
171
172 def conduct_pos_test_for_write_caps(self):
173 for mount, path, data in self.test_set:
174 log.info(f'test write perm: try writing data "{data}" to '
175 f'file {path}.')
176 mount.write_file(path=path, data=data)
177 contents = mount.read_file(path=path)
178 self.assertEqual(data, contents)
179 log.info(f'write perm was tested was successfully: data '
180 f'"{data}" was successfully written to file "{path}".')
181
182 def conduct_neg_test_for_write_caps(self, sudo_write=False):
183 possible_errmsgs = ('permission denied', 'operation not permitted')
184 cmdargs = ['echo', 'some random data', Raw('|')]
185 cmdargs += ['sudo', 'tee'] if sudo_write else ['tee']
f67539c2 186
1e59de90
TL
187 # don't use data, cmd args to write are set already above.
188 for mount, path, data in self.test_set:
189 log.info('test absence of write perm: expect failure '
190 f'writing data to file {path}.')
191 cmdargs.append(path)
192 mount.negtestcmd(args=cmdargs, retval=1, errmsgs=possible_errmsgs)
193 cmdargs.pop(-1)
194 log.info('absence of write perm was tested successfully: '
195 f'failed to be write data to file {path}.')