]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/cephfs_test_case.py
bccf2d3fa24856d35c412b6f8b5a52a332c7801c
[ceph.git] / ceph / qa / tasks / cephfs / cephfs_test_case.py
1 import json
2 import logging
3 import os
4 import re
5
6 from shlex import split as shlex_split
7
8 from tasks.ceph_test_case import CephTestCase
9
10 from teuthology import contextutil
11 from teuthology.orchestra import run
12 from teuthology.exceptions import CommandFailedError
13
14 log = logging.getLogger(__name__)
15
16 def for_teuthology(f):
17 """
18 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
19 """
20 f.is_for_teuthology = True
21 return f
22
23
24 def needs_trimming(f):
25 """
26 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
27 this means it needs to be able to run as root, currently)
28 """
29 f.needs_trimming = True
30 return f
31
32
33 class MountDetails():
34
35 def __init__(self, mntobj):
36 self.client_id = mntobj.client_id
37 self.client_keyring_path = mntobj.client_keyring_path
38 self.client_remote = mntobj.client_remote
39 self.cephfs_name = mntobj.cephfs_name
40 self.cephfs_mntpt = mntobj.cephfs_mntpt
41 self.hostfs_mntpt = mntobj.hostfs_mntpt
42
43 def restore(self, mntobj):
44 mntobj.client_id = self.client_id
45 mntobj.client_keyring_path = self.client_keyring_path
46 mntobj.client_remote = self.client_remote
47 mntobj.cephfs_name = self.cephfs_name
48 mntobj.cephfs_mntpt = self.cephfs_mntpt
49 mntobj.hostfs_mntpt = self.hostfs_mntpt
50
51
52 class CephFSTestCase(CephTestCase):
53 """
54 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
55 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
56
57 Handles resetting the cluster under test between tests.
58 """
59
60 # FIXME weird explicit naming
61 mount_a = None
62 mount_b = None
63 recovery_mount = None
64
65 # Declarative test requirements: subclasses should override these to indicate
66 # their special needs. If not met, tests will be skipped.
67 CLIENTS_REQUIRED = 1
68 MDSS_REQUIRED = 1
69 REQUIRE_ONE_CLIENT_REMOTE = False
70
71 # Whether to create the default filesystem during setUp
72 REQUIRE_FILESYSTEM = True
73
74 # requires REQUIRE_FILESYSTEM = True
75 REQUIRE_RECOVERY_FILESYSTEM = False
76
77 # create a backup filesystem if required.
78 # required REQUIRE_FILESYSTEM enabled
79 REQUIRE_BACKUP_FILESYSTEM = False
80
81 LOAD_SETTINGS = [] # type: ignore
82
83 def _save_mount_details(self):
84 """
85 XXX: Tests may change details of mount objects, so let's stash them so
86 that these details are restored later to ensure smooth setUps and
87 tearDowns for upcoming tests.
88 """
89 self._orig_mount_details = [MountDetails(m) for m in self.mounts]
90 log.info(self._orig_mount_details)
91
92 def setUp(self):
93 super(CephFSTestCase, self).setUp()
94
95 self.config_set('mon', 'mon_allow_pool_delete', True)
96
97 if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
98 self.skipTest("Only have {0} MDSs, require {1}".format(
99 len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
100 ))
101
102 if len(self.mounts) < self.CLIENTS_REQUIRED:
103 self.skipTest("Only have {0} clients, require {1}".format(
104 len(self.mounts), self.CLIENTS_REQUIRED
105 ))
106
107 if self.REQUIRE_ONE_CLIENT_REMOTE:
108 if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
109 self.skipTest("Require first client to be on separate server from MDSs")
110
111 # Create friendly mount_a, mount_b attrs
112 for i in range(0, self.CLIENTS_REQUIRED):
113 setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
114
115 self.mds_cluster.clear_firewall()
116
117 # Unmount all clients, we are about to blow away the filesystem
118 for mount in self.mounts:
119 if mount.is_mounted():
120 mount.umount_wait(force=True)
121 self._save_mount_details()
122
123 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
124 # the filesystem rather than just doing a rm -rf of files
125 self.mds_cluster.delete_all_filesystems()
126 self.mds_cluster.mds_restart() # to reset any run-time configs, etc.
127 self.fs = None # is now invalid!
128 self.backup_fs = None
129 self.recovery_fs = None
130
131 # In case anything is in the OSD blocklist list, clear it out. This is to avoid
132 # the OSD map changing in the background (due to blocklist expiry) while tests run.
133 try:
134 self.mds_cluster.mon_manager.run_cluster_cmd(args="osd blocklist clear")
135 except CommandFailedError:
136 # Fallback for older Ceph cluster
137 blocklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
138 "dump", "--format=json-pretty"))['blocklist']
139 log.info("Removing {0} blocklist entries".format(len(blocklist)))
140 for addr, blocklisted_at in blocklist.items():
141 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr)
142
143 client_mount_ids = [m.client_id for m in self.mounts]
144 # In case there were any extra auth identities around from a previous
145 # test, delete them
146 for entry in self.auth_list():
147 ent_type, ent_id = entry['entity'].split(".")
148 if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id[:6] == 'mirror'):
149 self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
150
151 if self.REQUIRE_FILESYSTEM:
152 self.fs = self.mds_cluster.newfs(create=True)
153
154 # In case some test messed with auth caps, reset them
155 for client_id in client_mount_ids:
156 cmd = ['auth', 'caps', f'client.{client_id}', 'mon','allow r',
157 'osd', f'allow rw pool={self.fs.get_data_pool_name()}',
158 'mds', 'allow']
159
160 if self.run_cluster_cmd_result(cmd) == 0:
161 break
162
163 cmd[1] = 'add'
164 if self.run_cluster_cmd_result(cmd) != 0:
165 raise RuntimeError(f'Failed to create new client {cmd[2]}')
166
167 # wait for ranks to become active
168 self.fs.wait_for_daemons()
169
170 # Mount the requested number of clients
171 for i in range(0, self.CLIENTS_REQUIRED):
172 self.mounts[i].mount_wait()
173
174 if self.REQUIRE_BACKUP_FILESYSTEM:
175 if not self.REQUIRE_FILESYSTEM:
176 self.skipTest("backup filesystem requires a primary filesystem as well")
177 self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
178 'enable_multiple', 'true',
179 '--yes-i-really-mean-it')
180 self.backup_fs = self.mds_cluster.newfs(name="backup_fs")
181 self.backup_fs.wait_for_daemons()
182
183 if self.REQUIRE_RECOVERY_FILESYSTEM:
184 if not self.REQUIRE_FILESYSTEM:
185 self.skipTest("Recovery filesystem requires a primary filesystem as well")
186 # After Octopus is EOL, we can remove this setting:
187 self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
188 'enable_multiple', 'true',
189 '--yes-i-really-mean-it')
190 self.recovery_fs = self.mds_cluster.newfs(name="recovery_fs", create=False)
191 self.recovery_fs.set_metadata_overlay(True)
192 self.recovery_fs.set_data_pool_name(self.fs.get_data_pool_name())
193 self.recovery_fs.create()
194 self.recovery_fs.getinfo(refresh=True)
195 self.recovery_fs.wait_for_daemons()
196
197 # Load an config settings of interest
198 for setting in self.LOAD_SETTINGS:
199 setattr(self, setting, float(self.fs.mds_asok(
200 ['config', 'get', setting], list(self.mds_cluster.mds_ids)[0]
201 )[setting]))
202
203 self.configs_set = set()
204
205 def tearDown(self):
206 self.mds_cluster.clear_firewall()
207 for m in self.mounts:
208 m.teardown()
209
210 # To prevent failover messages during Unwind of ceph task
211 self.mds_cluster.delete_all_filesystems()
212
213 for m, md in zip(self.mounts, self._orig_mount_details):
214 md.restore(m)
215
216 for subsys, key in self.configs_set:
217 self.mds_cluster.clear_ceph_conf(subsys, key)
218
219 return super(CephFSTestCase, self).tearDown()
220
221 def set_conf(self, subsys, key, value):
222 self.configs_set.add((subsys, key))
223 self.mds_cluster.set_ceph_conf(subsys, key, value)
224
225 def auth_list(self):
226 """
227 Convenience wrapper on "ceph auth ls"
228 """
229 return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
230 "auth", "ls", "--format=json-pretty"
231 ))['auth_dump']
232
233 def assert_session_count(self, expected, ls_data=None, mds_id=None):
234 if ls_data is None:
235 ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id)
236
237 alive_count = len([s for s in ls_data if s['state'] != 'killing'])
238
239 self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format(
240 expected, alive_count
241 ))
242
243 def assert_session_state(self, client_id, expected_state):
244 self.assertEqual(
245 self._session_by_id(
246 self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
247 expected_state)
248
249 def get_session_data(self, client_id):
250 return self._session_by_id(client_id)
251
252 def _session_list(self):
253 ls_data = self.fs.mds_asok(['session', 'ls'])
254 ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
255 return ls_data
256
257 def get_session(self, client_id, session_ls=None):
258 if session_ls is None:
259 session_ls = self.fs.mds_asok(['session', 'ls'])
260
261 return self._session_by_id(session_ls)[client_id]
262
263 def _session_by_id(self, session_ls):
264 return dict([(s['id'], s) for s in session_ls])
265
266 def perf_dump(self, rank=None, status=None):
267 return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
268
269 def wait_until_evicted(self, client_id, timeout=30):
270 def is_client_evicted():
271 ls = self._session_list()
272 for s in ls:
273 if s['id'] == client_id:
274 return False
275 return True
276 self.wait_until_true(is_client_evicted, timeout)
277
278 def wait_for_daemon_start(self, daemon_ids=None):
279 """
280 Wait until all the daemons appear in the FSMap, either assigned
281 MDS ranks or in the list of standbys
282 """
283 def get_daemon_names():
284 return [info['name'] for info in self.mds_cluster.status().get_all()]
285
286 if daemon_ids is None:
287 daemon_ids = self.mds_cluster.mds_ids
288
289 try:
290 self.wait_until_true(
291 lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids),
292 timeout=30
293 )
294 except RuntimeError:
295 log.warning("Timeout waiting for daemons {0}, while we have {1}".format(
296 daemon_ids, get_daemon_names()
297 ))
298 raise
299
300 def delete_mds_coredump(self, daemon_id):
301 # delete coredump file, otherwise teuthology.internal.coredump will
302 # catch it later and treat it as a failure.
303 core_pattern = self.mds_cluster.mds_daemons[daemon_id].remote.sh(
304 "sudo sysctl -n kernel.core_pattern")
305 core_dir = os.path.dirname(core_pattern.strip())
306 if core_dir: # Non-default core_pattern with a directory in it
307 # We have seen a core_pattern that looks like it's from teuthology's coredump
308 # task, so proceed to clear out the core file
309 if core_dir[0] == '|':
310 log.info("Piped core dumps to program {0}, skip cleaning".format(core_dir[1:]))
311 return;
312
313 log.info("Clearing core from directory: {0}".format(core_dir))
314
315 # Verify that we see the expected single coredump
316 ls_output = self.mds_cluster.mds_daemons[daemon_id].remote.sh([
317 "cd", core_dir, run.Raw('&&'),
318 "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file"
319 ])
320 cores = [l.partition(":")[0]
321 for l in ls_output.strip().split("\n")
322 if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)]
323
324 log.info("Enumerated cores: {0}".format(cores))
325 self.assertEqual(len(cores), 1)
326
327 log.info("Found core file {0}, deleting it".format(cores[0]))
328
329 self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
330 "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0]
331 ])
332 else:
333 log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
334
335 def _get_subtrees(self, status=None, rank=None, path=None):
336 if path is None:
337 path = "/"
338 try:
339 with contextutil.safe_while(sleep=1, tries=3) as proceed:
340 while proceed():
341 try:
342 if rank == "all":
343 subtrees = []
344 for r in self.fs.get_ranks(status=status):
345 s = self.fs.rank_asok(["get", "subtrees"], status=status, rank=r['rank'])
346 s = filter(lambda s: s['auth_first'] == r['rank'] and s['auth_second'] == -2, s)
347 subtrees += s
348 else:
349 subtrees = self.fs.rank_asok(["get", "subtrees"], status=status, rank=rank)
350 subtrees = filter(lambda s: s['dir']['path'].startswith(path), subtrees)
351 return list(subtrees)
352 except CommandFailedError as e:
353 # Sometimes we get transient errors
354 if e.exitstatus == 22:
355 pass
356 else:
357 raise
358 except contextutil.MaxWhileTries as e:
359 raise RuntimeError(f"could not get subtree state from rank {rank}") from e
360
361 def _wait_subtrees(self, test, status=None, rank=None, timeout=30, sleep=2, action=None, path=None):
362 test = sorted(test)
363 try:
364 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
365 while proceed():
366 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
367 filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees])
368 log.info("%s =?= %s", filtered, test)
369 if filtered == test:
370 # Confirm export_pin in output is correct:
371 for s in subtrees:
372 if s['export_pin_target'] >= 0:
373 self.assertTrue(s['export_pin_target'] == s['auth_first'])
374 return subtrees
375 if action is not None:
376 action()
377 except contextutil.MaxWhileTries as e:
378 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
379
380 def _wait_until_scrub_complete(self, path="/", recursive=True, timeout=100):
381 out_json = self.fs.run_scrub(["start", path] + ["recursive"] if recursive else [])
382 if not self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"],
383 sleep=10, timeout=timeout):
384 log.info("timed out waiting for scrub to complete")
385
386 def _wait_distributed_subtrees(self, count, status=None, rank=None, path=None):
387 try:
388 with contextutil.safe_while(sleep=5, tries=20) as proceed:
389 while proceed():
390 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
391 subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == True and
392 s['auth_first'] == s['export_pin_target'],
393 subtrees))
394 log.info(f"len={len(subtrees)} {subtrees}")
395 if len(subtrees) >= count:
396 return subtrees
397 except contextutil.MaxWhileTries as e:
398 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
399
400 def _wait_random_subtrees(self, count, status=None, rank=None, path=None):
401 try:
402 with contextutil.safe_while(sleep=5, tries=20) as proceed:
403 while proceed():
404 subtrees = self._get_subtrees(status=status, rank=rank, path=path)
405 subtrees = list(filter(lambda s: s['random_ephemeral_pin'] == True and
406 s['auth_first'] == s['export_pin_target'],
407 subtrees))
408 log.info(f"len={len(subtrees)} {subtrees}")
409 if len(subtrees) >= count:
410 return subtrees
411 except contextutil.MaxWhileTries as e:
412 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e
413
414 def run_cluster_cmd(self, cmd):
415 if isinstance(cmd, str):
416 cmd = shlex_split(cmd)
417 return self.fs.mon_manager.raw_cluster_cmd(*cmd)
418
419 def run_cluster_cmd_result(self, cmd):
420 if isinstance(cmd, str):
421 cmd = shlex_split(cmd)
422 return self.fs.mon_manager.raw_cluster_cmd_result(*cmd)
423
424 def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None):
425 if not (moncap or osdcap or mdscap):
426 if self.fs:
427 return self.fs.authorize(client_id, ('/', 'rw'))
428 else:
429 raise RuntimeError('no caps were passed and the default FS '
430 'is not created yet to allow client auth '
431 'for it.')
432
433 cmd = ['auth', 'add', f'client.{client_id}']
434 if moncap:
435 cmd += ['mon', moncap]
436 if osdcap:
437 cmd += ['osd', osdcap]
438 if mdscap:
439 cmd += ['mds', mdscap]
440
441 self.run_cluster_cmd(cmd)
442 return self.run_cluster_cmd(f'auth get {self.client_name}')