]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/cephfs_test_case.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / qa / tasks / cephfs / cephfs_test_case.py
1 import json
2 import logging
3 import os
4 import re
5
6 from shlex import split as shlex_split
7
8 from tasks.ceph_test_case import CephTestCase
9
10 from teuthology import contextutil
11 from teuthology.orchestra import run
12 from teuthology.exceptions import CommandFailedError
13
14 log = logging.getLogger(__name__)
15
16 def classhook(m):
17 def dec(cls):
18 getattr(cls, m)()
19 return cls
20 return dec
21
22 def for_teuthology(f):
23 """
24 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
25 """
26 f.is_for_teuthology = True
27 return f
28
29
30 def needs_trimming(f):
31 """
32 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
33 this means it needs to be able to run as root, currently)
34 """
35 f.needs_trimming = True
36 return f
37
38
39 class MountDetails():
40
41 def __init__(self, mntobj):
42 self.client_id = mntobj.client_id
43 self.client_keyring_path = mntobj.client_keyring_path
44 self.client_remote = mntobj.client_remote
45 self.cephfs_name = mntobj.cephfs_name
46 self.cephfs_mntpt = mntobj.cephfs_mntpt
47 self.hostfs_mntpt = mntobj.hostfs_mntpt
48
49 def restore(self, mntobj):
50 mntobj.client_id = self.client_id
51 mntobj.client_keyring_path = self.client_keyring_path
52 mntobj.client_remote = self.client_remote
53 mntobj.cephfs_name = self.cephfs_name
54 mntobj.cephfs_mntpt = self.cephfs_mntpt
55 mntobj.hostfs_mntpt = self.hostfs_mntpt
56
57
58 class CephFSTestCase(CephTestCase):
59 """
60 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
61 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
62
63 Handles resetting the cluster under test between tests.
64 """
65
66 # FIXME weird explicit naming
67 mount_a = None
68 mount_b = None
69 recovery_mount = None
70
71 # Declarative test requirements: subclasses should override these to indicate
72 # their special needs. If not met, tests will be skipped.
73 CLIENTS_REQUIRED = 1
74 MDSS_REQUIRED = 1
75 REQUIRE_ONE_CLIENT_REMOTE = False
76
77 # Whether to create the default filesystem during setUp
78 REQUIRE_FILESYSTEM = True
79
80 # create a backup filesystem if required.
81 # required REQUIRE_FILESYSTEM enabled
82 REQUIRE_BACKUP_FILESYSTEM = False
83
84 LOAD_SETTINGS = [] # type: ignore
85
86 def _save_mount_details(self):
87 """
88 XXX: Tests may change details of mount objects, so let's stash them so
89 that these details are restored later to ensure smooth setUps and
90 tearDowns for upcoming tests.
91 """
92 self._orig_mount_details = [MountDetails(m) for m in self.mounts]
93 log.info(self._orig_mount_details)
94
95 def _remove_blocklist(self):
96 # In case anything is in the OSD blocklist list, clear it out. This is to avoid
97 # the OSD map changing in the background (due to blocklist expiry) while tests run.
98 try:
99 self.mds_cluster.mon_manager.run_cluster_cmd(args="osd blocklist clear")
100 except CommandFailedError:
101 # Fallback for older Ceph cluster
102 try:
103 blocklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
104 "dump", "--format=json-pretty"))['blocklist']
105 log.info(f"Removing {len(blocklist)} blocklist entries")
106 for addr, blocklisted_at in blocklist.items():
107 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr)
108 except KeyError:
109 # Fallback for more older Ceph clusters, who will use 'blacklist' instead.
110 blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
111 "dump", "--format=json-pretty"))['blacklist']
112 log.info(f"Removing {len(blacklist)} blacklist entries")
113 for addr, blocklisted_at in blacklist.items():
114 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
115
116 def setUp(self):
117 super(CephFSTestCase, self).setUp()
118
119 self.config_set('mon', 'mon_allow_pool_delete', True)
120
121 if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
122 self.skipTest("Only have {0} MDSs, require {1}".format(
123 len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
124 ))
125
126 if len(self.mounts) < self.CLIENTS_REQUIRED:
127 self.skipTest("Only have {0} clients, require {1}".format(
128 len(self.mounts), self.CLIENTS_REQUIRED
129 ))
130
131 if self.REQUIRE_ONE_CLIENT_REMOTE:
132 if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
133 self.skipTest("Require first client to be on separate server from MDSs")
134
135 # Create friendly mount_a, mount_b attrs
136 for i in range(0, self.CLIENTS_REQUIRED):
137 setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
138
139 self.mds_cluster.clear_firewall()
140
141 # Unmount all clients, we are about to blow away the filesystem
142 for mount in self.mounts:
143 if mount.is_mounted():
144 mount.umount_wait(force=True)
145 self._save_mount_details()
146
147 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
148 # the filesystem rather than just doing a rm -rf of files
149 self.mds_cluster.delete_all_filesystems()
150 self.mds_cluster.mds_restart() # to reset any run-time configs, etc.
151 self.fs = None # is now invalid!
152 self.backup_fs = None
153 self.recovery_fs = None
154
155 self._remove_blocklist()
156
157 client_mount_ids = [m.client_id for m in self.mounts]
158 # In case there were any extra auth identities around from a previous
159 # test, delete them
160 for entry in self.auth_list():
161 ent_type, ent_id = entry['entity'].split(".")
162 if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id[:6] == 'mirror'):
163 self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
164
165 if self.REQUIRE_FILESYSTEM:
166 self.fs = self.mds_cluster.newfs(create=True)
167
168 # In case some test messed with auth caps, reset them
169 for client_id in client_mount_ids:
170 cmd = ['auth', 'caps', f'client.{client_id}', 'mon','allow r',
171 'osd', f'allow rw tag cephfs data={self.fs.name}',
172 'mds', 'allow']
173
174 if self.run_cluster_cmd_result(cmd) == 0:
175 break
176
177 cmd[1] = 'add'
178 if self.run_cluster_cmd_result(cmd) != 0:
179 raise RuntimeError(f'Failed to create new client {cmd[2]}')
180
181 # wait for ranks to become active
182 self.fs.wait_for_daemons()
183
184 # Mount the requested number of clients
185 for i in range(0, self.CLIENTS_REQUIRED):
186 self.mounts[i].mount_wait()
187
188 if self.REQUIRE_BACKUP_FILESYSTEM:
189 if not self.REQUIRE_FILESYSTEM:
190 self.skipTest("backup filesystem requires a primary filesystem as well")
191 self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
192 'enable_multiple', 'true',
193 '--yes-i-really-mean-it')
194 self.backup_fs = self.mds_cluster.newfs(name="backup_fs")
195 self.backup_fs.wait_for_daemons()
196
197 # Load an config settings of interest
198 for setting in self.LOAD_SETTINGS:
199 setattr(self, setting, float(self.fs.mds_asok(
200 ['config', 'get', setting], list(self.mds_cluster.mds_ids)[0]
201 )[setting]))
202
203 self.configs_set = set()
204
205 def tearDown(self):
206 self.mds_cluster.clear_firewall()
207 for m in self.mounts:
208 m.teardown()
209
210 # To prevent failover messages during Unwind of ceph task
211 self.mds_cluster.delete_all_filesystems()
212
213 for m, md in zip(self.mounts, self._orig_mount_details):
214 md.restore(m)
215
216 for subsys, key in self.configs_set:
217 self.mds_cluster.clear_ceph_conf(subsys, key)
218
219 return super(CephFSTestCase, self).tearDown()
220
221 def set_conf(self, subsys, key, value):
222 self.configs_set.add((subsys, key))
223 self.mds_cluster.set_ceph_conf(subsys, key, value)
224
225 def auth_list(self):
226 """
227 Convenience wrapper on "ceph auth ls"
228 """
229 return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
230 "auth", "ls", "--format=json-pretty"
231 ))['auth_dump']
232
233 def assert_session_count(self, expected, ls_data=None, mds_id=None):
234 if ls_data is None:
235 ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id)
236
237 alive_count = len([s for s in ls_data if s['state'] != 'killing'])
238
239 self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format(
240 expected, alive_count
241 ))
242
243 def assert_session_state(self, client_id, expected_state):
244 self.assertEqual(
245 self._session_by_id(
246 self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
247 expected_state)
248
249 def get_session_data(self, client_id):
250 return self._session_by_id(client_id)
251
252 def _session_list(self):
253 ls_data = self.fs.mds_asok(['session', 'ls'])
254 ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
255 return ls_data
256
257 def get_session(self, client_id, session_ls=None):
258 if session_ls is None:
259 session_ls = self.fs.mds_asok(['session', 'ls'])
260
261 return self._session_by_id(session_ls)[client_id]
262
263 def _session_by_id(self, session_ls):
264 return dict([(s['id'], s) for s in session_ls])
265
266 def perf_dump(self, rank=None, status=None):
267 return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
268
269 def wait_until_evicted(self, client_id, timeout=30):
270 def is_client_evicted():
271 ls = self._session_list()
272 for s in ls:
273 if s['id'] == client_id:
274 return False
275 return True
276 self.wait_until_true(is_client_evicted, timeout)
277
278 def wait_for_daemon_start(self, daemon_ids=None):
279 """
280 Wait until all the daemons appear in the FSMap, either assigned
281 MDS ranks or in the list of standbys
282 """
283 def get_daemon_names():
284 return [info['name'] for info in self.mds_cluster.status().get_all()]
285
286 if daemon_ids is None:
287 daemon_ids = self.mds_cluster.mds_ids
288
289 try:
290 self.wait_until_true(
291 lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids),
292 timeout=30
293 )
294 except RuntimeError:
295 log.warning("Timeout waiting for daemons {0}, while we have {1}".format(
296 daemon_ids, get_daemon_names()
297 ))
298 raise
299
300 def delete_mds_coredump(self, daemon_id):
301 # delete coredump file, otherwise teuthology.internal.coredump will
302 # catch it later and treat it as a failure.
303 core_pattern = self.mds_cluster.mds_daemons[daemon_id].remote.sh(
304 "sudo sysctl -n kernel.core_pattern")
305 core_dir = os.path.dirname(core_pattern.strip())
306 if core_dir: # Non-default core_pattern with a directory in it
307 # We have seen a core_pattern that looks like it's from teuthology's coredump
308 # task, so proceed to clear out the core file
309 if core_dir[0] == '|':
310 log.info("Piped core dumps to program {0}, skip cleaning".format(core_dir[1:]))
311 return;
312
313 log.info("Clearing core from directory: {0}".format(core_dir))
314
315 # Verify that we see the expected single coredump
316 ls_output = self.mds_cluster.mds_daemons[daemon_id].remote.sh([
317 "cd", core_dir, run.Raw('&&'),
318 "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file"
319 ])
320 cores = [l.partition(":")[0]
321 for l in ls_output.strip().split("\n")
322 if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)]
323
324 log.info("Enumerated cores: {0}".format(cores))
325 self.assertEqual(len(cores), 1)
326
327 log.info("Found core file {0}, deleting it".format(cores[0]))
328
329 self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
330 "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0]
331 ])
332 else:
333 log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
334
335 def _get_subtrees(self, status=None, rank=None, path=None):
336 if path is None:
337 path = "/"
338 try:
339 with contextutil.safe_while(sleep=1, tries=3) as proceed:
340 while proceed():
341 try:
342 if rank == "all":
343 subtrees = []
344 for r in self.fs.get_ranks(status=status):
345 s = self.fs.rank_asok(["get", "subtrees"], status=status, rank=r['rank'])
346 s = filter(lambda s: s['auth_first'] == r['rank'] and s['auth_second'] == -2, s)
347 subtrees += s
348 else:
349 subtrees = self.fs.rank_asok(["get", "subtrees"], status=status, rank=rank)
350 subtrees = filter(lambda s: s['dir']['path'].startswith(path), subtrees)
351 return list(subtrees)
352 except CommandFailedError as e:
353 # Sometimes we get transient errors
354 if e.exitstatus == 22:
355 pass
356 else:
357 raise
358 except contextutil.MaxWhileTries as e:
359 raise RuntimeError(f"could not get subtree state from rank {rank}") from e
360
361 def _wait_subtrees(self, test, status=None, rank=None, timeout=30, sleep=2, action=None, path=None):
362 test = sorted(test)
363 try:
364 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
365 while proceed():
366 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
367 filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees])
368 log.info("%s =?= %s", filtered, test)
369 if filtered == test:
370 # Confirm export_pin in output is correct:
371 for s in subtrees:
372 if s['export_pin_target'] >= 0:
373 self.assertTrue(s['export_pin_target'] == s['auth_first'])
374 return subtrees
375 if action is not None:
376 action()
377 except contextutil.MaxWhileTries as e:
378 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
379
380 def _wait_until_scrub_complete(self, path="/", recursive=True, timeout=100):
381 out_json = self.fs.run_scrub(["start", path] + ["recursive"] if recursive else [])
382 if not self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"],
383 sleep=10, timeout=timeout):
384 log.info("timed out waiting for scrub to complete")
385
386 def _wait_distributed_subtrees(self, count, status=None, rank=None, path=None):
387 try:
388 with contextutil.safe_while(sleep=5, tries=20) as proceed:
389 while proceed():
390 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
391 subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == True and
392 s['auth_first'] == s['export_pin_target'],
393 subtrees))
394 log.info(f"len={len(subtrees)} {subtrees}")
395 if len(subtrees) >= count:
396 return subtrees
397 except contextutil.MaxWhileTries as e:
398 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
399
400 def _wait_random_subtrees(self, count, status=None, rank=None, path=None):
401 try:
402 with contextutil.safe_while(sleep=5, tries=20) as proceed:
403 while proceed():
404 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
405 subtrees = list(filter(lambda s: s['random_ephemeral_pin'] == True and
406 s['auth_first'] == s['export_pin_target'],
407 subtrees))
408 log.info(f"len={len(subtrees)} {subtrees}")
409 if len(subtrees) >= count:
410 return subtrees
411 except contextutil.MaxWhileTries as e:
412 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
413
414 def run_cluster_cmd(self, cmd):
415 if isinstance(cmd, str):
416 cmd = shlex_split(cmd)
417 return self.fs.mon_manager.raw_cluster_cmd(*cmd)
418
419 def run_cluster_cmd_result(self, cmd):
420 if isinstance(cmd, str):
421 cmd = shlex_split(cmd)
422 return self.fs.mon_manager.raw_cluster_cmd_result(*cmd)
423
424 def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None):
425 if not (moncap or osdcap or mdscap):
426 if self.fs:
427 return self.fs.authorize(client_id, ('/', 'rw'))
428 else:
429 raise RuntimeError('no caps were passed and the default FS '
430 'is not created yet to allow client auth '
431 'for it.')
432
433 cmd = ['auth', 'add', f'client.{client_id}']
434 if moncap:
435 cmd += ['mon', moncap]
436 if osdcap:
437 cmd += ['osd', osdcap]
438 if mdscap:
439 cmd += ['mds', mdscap]
440
441 self.run_cluster_cmd(cmd)
442 return self.run_cluster_cmd(f'auth get {self.client_name}')