]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import logging | |
7c673cae FG |
3 | import os |
4 | import re | |
7c673cae | 5 | |
f67539c2 | 6 | from shlex import split as shlex_split |
f67539c2 TL |
7 | |
8 | from tasks.ceph_test_case import CephTestCase | |
7c673cae | 9 | |
f6b5b4d7 | 10 | from teuthology import contextutil |
7c673cae | 11 | from teuthology.orchestra import run |
20effc67 | 12 | from teuthology.exceptions import CommandFailedError |
7c673cae FG |
13 | |
14 | log = logging.getLogger(__name__) | |
15 | ||
aee94f69 TL |
16 | def classhook(m): |
17 | def dec(cls): | |
18 | getattr(cls, m)() | |
19 | return cls | |
20 | return dec | |
21 | ||
7c673cae FG |
22 | def for_teuthology(f): |
23 | """ | |
24 | Decorator that adds an "is_for_teuthology" attribute to the wrapped function | |
25 | """ | |
26 | f.is_for_teuthology = True | |
27 | return f | |
28 | ||
29 | ||
30 | def needs_trimming(f): | |
31 | """ | |
32 | Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse | |
33 | this means it needs to be able to run as root, currently) | |
34 | """ | |
35 | f.needs_trimming = True | |
36 | return f | |
37 | ||
38 | ||
f67539c2 TL |
39 | class MountDetails(): |
40 | ||
41 | def __init__(self, mntobj): | |
42 | self.client_id = mntobj.client_id | |
43 | self.client_keyring_path = mntobj.client_keyring_path | |
44 | self.client_remote = mntobj.client_remote | |
45 | self.cephfs_name = mntobj.cephfs_name | |
46 | self.cephfs_mntpt = mntobj.cephfs_mntpt | |
47 | self.hostfs_mntpt = mntobj.hostfs_mntpt | |
48 | ||
49 | def restore(self, mntobj): | |
50 | mntobj.client_id = self.client_id | |
51 | mntobj.client_keyring_path = self.client_keyring_path | |
52 | mntobj.client_remote = self.client_remote | |
53 | mntobj.cephfs_name = self.cephfs_name | |
54 | mntobj.cephfs_mntpt = self.cephfs_mntpt | |
55 | mntobj.hostfs_mntpt = self.hostfs_mntpt | |
56 | ||
57 | ||
7c673cae FG |
58 | class CephFSTestCase(CephTestCase): |
59 | """ | |
60 | Test case for Ceph FS, requires caller to populate Filesystem and Mounts, | |
61 | into the fs, mount_a, mount_b class attributes (setting mount_b is optional) | |
62 | ||
63 | Handles resetting the cluster under test between tests. | |
64 | """ | |
65 | ||
66 | # FIXME weird explicit naming | |
67 | mount_a = None | |
68 | mount_b = None | |
181888fb | 69 | recovery_mount = None |
7c673cae FG |
70 | |
71 | # Declarative test requirements: subclasses should override these to indicate | |
72 | # their special needs. If not met, tests will be skipped. | |
73 | CLIENTS_REQUIRED = 1 | |
74 | MDSS_REQUIRED = 1 | |
7c673cae | 75 | REQUIRE_ONE_CLIENT_REMOTE = False |
7c673cae FG |
76 | |
77 | # Whether to create the default filesystem during setUp | |
78 | REQUIRE_FILESYSTEM = True | |
79 | ||
f67539c2 TL |
80 | # create a backup filesystem if required. |
81 | # required REQUIRE_FILESYSTEM enabled | |
82 | REQUIRE_BACKUP_FILESYSTEM = False | |
83 | ||
9f95a23c | 84 | LOAD_SETTINGS = [] # type: ignore |
7c673cae | 85 | |
f67539c2 TL |
86 | def _save_mount_details(self): |
87 | """ | |
88 | XXX: Tests may change details of mount objects, so let's stash them so | |
89 | that these details are restored later to ensure smooth setUps and | |
90 | tearDowns for upcoming tests. | |
91 | """ | |
92 | self._orig_mount_details = [MountDetails(m) for m in self.mounts] | |
93 | log.info(self._orig_mount_details) | |
94 | ||
39ae355f TL |
95 | def _remove_blocklist(self): |
96 | # In case anything is in the OSD blocklist list, clear it out. This is to avoid | |
97 | # the OSD map changing in the background (due to blocklist expiry) while tests run. | |
98 | try: | |
99 | self.mds_cluster.mon_manager.run_cluster_cmd(args="osd blocklist clear") | |
100 | except CommandFailedError: | |
101 | # Fallback for older Ceph cluster | |
102 | try: | |
103 | blocklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd", | |
104 | "dump", "--format=json-pretty"))['blocklist'] | |
105 | log.info(f"Removing {len(blocklist)} blocklist entries") | |
106 | for addr, blocklisted_at in blocklist.items(): | |
107 | self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blocklist", "rm", addr) | |
108 | except KeyError: | |
109 | # Fallback for more older Ceph clusters, who will use 'blacklist' instead. | |
110 | blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd", | |
111 | "dump", "--format=json-pretty"))['blacklist'] | |
112 | log.info(f"Removing {len(blacklist)} blacklist entries") | |
113 | for addr, blocklisted_at in blacklist.items(): | |
114 | self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr) | |
115 | ||
7c673cae FG |
116 | def setUp(self): |
117 | super(CephFSTestCase, self).setUp() | |
118 | ||
f91f0fd5 TL |
119 | self.config_set('mon', 'mon_allow_pool_delete', True) |
120 | ||
7c673cae | 121 | if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED: |
9f95a23c | 122 | self.skipTest("Only have {0} MDSs, require {1}".format( |
7c673cae FG |
123 | len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED |
124 | )) | |
125 | ||
126 | if len(self.mounts) < self.CLIENTS_REQUIRED: | |
9f95a23c | 127 | self.skipTest("Only have {0} clients, require {1}".format( |
7c673cae FG |
128 | len(self.mounts), self.CLIENTS_REQUIRED |
129 | )) | |
130 | ||
7c673cae FG |
131 | if self.REQUIRE_ONE_CLIENT_REMOTE: |
132 | if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames(): | |
9f95a23c | 133 | self.skipTest("Require first client to be on separate server from MDSs") |
7c673cae | 134 | |
7c673cae FG |
135 | # Create friendly mount_a, mount_b attrs |
136 | for i in range(0, self.CLIENTS_REQUIRED): | |
137 | setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i]) | |
138 | ||
139 | self.mds_cluster.clear_firewall() | |
140 | ||
141 | # Unmount all clients, we are about to blow away the filesystem | |
142 | for mount in self.mounts: | |
143 | if mount.is_mounted(): | |
144 | mount.umount_wait(force=True) | |
f67539c2 | 145 | self._save_mount_details() |
7c673cae FG |
146 | |
147 | # To avoid any issues with e.g. unlink bugs, we destroy and recreate | |
148 | # the filesystem rather than just doing a rm -rf of files | |
7c673cae | 149 | self.mds_cluster.delete_all_filesystems() |
92f5a8d4 | 150 | self.mds_cluster.mds_restart() # to reset any run-time configs, etc. |
7c673cae | 151 | self.fs = None # is now invalid! |
f67539c2 | 152 | self.backup_fs = None |
181888fb | 153 | self.recovery_fs = None |
7c673cae | 154 | |
39ae355f | 155 | self._remove_blocklist() |
7c673cae FG |
156 | |
157 | client_mount_ids = [m.client_id for m in self.mounts] | |
7c673cae FG |
158 | # In case there were any extra auth identities around from a previous |
159 | # test, delete them | |
160 | for entry in self.auth_list(): | |
161 | ent_type, ent_id = entry['entity'].split(".") | |
f67539c2 | 162 | if ent_type == "client" and ent_id not in client_mount_ids and not (ent_id == "admin" or ent_id[:6] == 'mirror'): |
7c673cae FG |
163 | self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity']) |
164 | ||
165 | if self.REQUIRE_FILESYSTEM: | |
181888fb | 166 | self.fs = self.mds_cluster.newfs(create=True) |
7c673cae FG |
167 | |
168 | # In case some test messed with auth caps, reset them | |
169 | for client_id in client_mount_ids: | |
f67539c2 | 170 | cmd = ['auth', 'caps', f'client.{client_id}', 'mon','allow r', |
1e59de90 | 171 | 'osd', f'allow rw tag cephfs data={self.fs.name}', |
f67539c2 TL |
172 | 'mds', 'allow'] |
173 | ||
174 | if self.run_cluster_cmd_result(cmd) == 0: | |
175 | break | |
176 | ||
177 | cmd[1] = 'add' | |
178 | if self.run_cluster_cmd_result(cmd) != 0: | |
179 | raise RuntimeError(f'Failed to create new client {cmd[2]}') | |
7c673cae | 180 | |
92f5a8d4 | 181 | # wait for ranks to become active |
7c673cae FG |
182 | self.fs.wait_for_daemons() |
183 | ||
184 | # Mount the requested number of clients | |
185 | for i in range(0, self.CLIENTS_REQUIRED): | |
e306af50 | 186 | self.mounts[i].mount_wait() |
7c673cae | 187 | |
f67539c2 TL |
188 | if self.REQUIRE_BACKUP_FILESYSTEM: |
189 | if not self.REQUIRE_FILESYSTEM: | |
190 | self.skipTest("backup filesystem requires a primary filesystem as well") | |
191 | self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set', | |
192 | 'enable_multiple', 'true', | |
193 | '--yes-i-really-mean-it') | |
194 | self.backup_fs = self.mds_cluster.newfs(name="backup_fs") | |
195 | self.backup_fs.wait_for_daemons() | |
196 | ||
7c673cae FG |
197 | # Load an config settings of interest |
198 | for setting in self.LOAD_SETTINGS: | |
c07f9fc5 | 199 | setattr(self, setting, float(self.fs.mds_asok( |
e306af50 | 200 | ['config', 'get', setting], list(self.mds_cluster.mds_ids)[0] |
7c673cae FG |
201 | )[setting])) |
202 | ||
203 | self.configs_set = set() | |
204 | ||
205 | def tearDown(self): | |
7c673cae FG |
206 | self.mds_cluster.clear_firewall() |
207 | for m in self.mounts: | |
208 | m.teardown() | |
209 | ||
f67539c2 TL |
210 | # To prevent failover messages during Unwind of ceph task |
211 | self.mds_cluster.delete_all_filesystems() | |
212 | ||
213 | for m, md in zip(self.mounts, self._orig_mount_details): | |
214 | md.restore(m) | |
7c673cae FG |
215 | |
216 | for subsys, key in self.configs_set: | |
217 | self.mds_cluster.clear_ceph_conf(subsys, key) | |
218 | ||
9f95a23c TL |
219 | return super(CephFSTestCase, self).tearDown() |
220 | ||
7c673cae FG |
221 | def set_conf(self, subsys, key, value): |
222 | self.configs_set.add((subsys, key)) | |
223 | self.mds_cluster.set_ceph_conf(subsys, key, value) | |
224 | ||
225 | def auth_list(self): | |
226 | """ | |
c07f9fc5 | 227 | Convenience wrapper on "ceph auth ls" |
7c673cae FG |
228 | """ |
229 | return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd( | |
c07f9fc5 | 230 | "auth", "ls", "--format=json-pretty" |
7c673cae FG |
231 | ))['auth_dump'] |
232 | ||
233 | def assert_session_count(self, expected, ls_data=None, mds_id=None): | |
234 | if ls_data is None: | |
235 | ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id) | |
236 | ||
31f18b77 FG |
237 | alive_count = len([s for s in ls_data if s['state'] != 'killing']) |
238 | ||
239 | self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format( | |
240 | expected, alive_count | |
7c673cae FG |
241 | )) |
242 | ||
243 | def assert_session_state(self, client_id, expected_state): | |
244 | self.assertEqual( | |
245 | self._session_by_id( | |
246 | self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], | |
247 | expected_state) | |
248 | ||
249 | def get_session_data(self, client_id): | |
250 | return self._session_by_id(client_id) | |
251 | ||
252 | def _session_list(self): | |
253 | ls_data = self.fs.mds_asok(['session', 'ls']) | |
254 | ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] | |
255 | return ls_data | |
256 | ||
257 | def get_session(self, client_id, session_ls=None): | |
258 | if session_ls is None: | |
259 | session_ls = self.fs.mds_asok(['session', 'ls']) | |
260 | ||
261 | return self._session_by_id(session_ls)[client_id] | |
262 | ||
263 | def _session_by_id(self, session_ls): | |
264 | return dict([(s['id'], s) for s in session_ls]) | |
265 | ||
adb31ebb TL |
266 | def perf_dump(self, rank=None, status=None): |
267 | return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status) | |
268 | ||
92f5a8d4 TL |
269 | def wait_until_evicted(self, client_id, timeout=30): |
270 | def is_client_evicted(): | |
271 | ls = self._session_list() | |
272 | for s in ls: | |
273 | if s['id'] == client_id: | |
274 | return False | |
275 | return True | |
276 | self.wait_until_true(is_client_evicted, timeout) | |
277 | ||
7c673cae FG |
278 | def wait_for_daemon_start(self, daemon_ids=None): |
279 | """ | |
280 | Wait until all the daemons appear in the FSMap, either assigned | |
281 | MDS ranks or in the list of standbys | |
282 | """ | |
283 | def get_daemon_names(): | |
284 | return [info['name'] for info in self.mds_cluster.status().get_all()] | |
285 | ||
286 | if daemon_ids is None: | |
287 | daemon_ids = self.mds_cluster.mds_ids | |
288 | ||
289 | try: | |
290 | self.wait_until_true( | |
291 | lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids), | |
292 | timeout=30 | |
293 | ) | |
294 | except RuntimeError: | |
e306af50 | 295 | log.warning("Timeout waiting for daemons {0}, while we have {1}".format( |
7c673cae FG |
296 | daemon_ids, get_daemon_names() |
297 | )) | |
298 | raise | |
299 | ||
11fdf7f2 TL |
300 | def delete_mds_coredump(self, daemon_id): |
301 | # delete coredump file, otherwise teuthology.internal.coredump will | |
302 | # catch it later and treat it as a failure. | |
e306af50 TL |
303 | core_pattern = self.mds_cluster.mds_daemons[daemon_id].remote.sh( |
304 | "sudo sysctl -n kernel.core_pattern") | |
305 | core_dir = os.path.dirname(core_pattern.strip()) | |
11fdf7f2 TL |
306 | if core_dir: # Non-default core_pattern with a directory in it |
307 | # We have seen a core_pattern that looks like it's from teuthology's coredump | |
308 | # task, so proceed to clear out the core file | |
f6b5b4d7 TL |
309 | if core_dir[0] == '|': |
310 | log.info("Piped core dumps to program {0}, skip cleaning".format(core_dir[1:])) | |
311 | return; | |
312 | ||
11fdf7f2 TL |
313 | log.info("Clearing core from directory: {0}".format(core_dir)) |
314 | ||
315 | # Verify that we see the expected single coredump | |
e306af50 | 316 | ls_output = self.mds_cluster.mds_daemons[daemon_id].remote.sh([ |
11fdf7f2 TL |
317 | "cd", core_dir, run.Raw('&&'), |
318 | "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file" | |
e306af50 | 319 | ]) |
11fdf7f2 | 320 | cores = [l.partition(":")[0] |
e306af50 | 321 | for l in ls_output.strip().split("\n") |
11fdf7f2 TL |
322 | if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)] |
323 | ||
324 | log.info("Enumerated cores: {0}".format(cores)) | |
325 | self.assertEqual(len(cores), 1) | |
326 | ||
327 | log.info("Found core file {0}, deleting it".format(cores[0])) | |
328 | ||
329 | self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[ | |
330 | "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0] | |
331 | ]) | |
7c673cae | 332 | else: |
11fdf7f2 | 333 | log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)") |
81eedcae | 334 | |
f6b5b4d7 TL |
335 | def _get_subtrees(self, status=None, rank=None, path=None): |
336 | if path is None: | |
337 | path = "/" | |
338 | try: | |
339 | with contextutil.safe_while(sleep=1, tries=3) as proceed: | |
340 | while proceed(): | |
341 | try: | |
342 | if rank == "all": | |
343 | subtrees = [] | |
344 | for r in self.fs.get_ranks(status=status): | |
345 | s = self.fs.rank_asok(["get", "subtrees"], status=status, rank=r['rank']) | |
346 | s = filter(lambda s: s['auth_first'] == r['rank'] and s['auth_second'] == -2, s) | |
347 | subtrees += s | |
348 | else: | |
349 | subtrees = self.fs.rank_asok(["get", "subtrees"], status=status, rank=rank) | |
350 | subtrees = filter(lambda s: s['dir']['path'].startswith(path), subtrees) | |
351 | return list(subtrees) | |
352 | except CommandFailedError as e: | |
353 | # Sometimes we get transient errors | |
354 | if e.exitstatus == 22: | |
355 | pass | |
356 | else: | |
357 | raise | |
358 | except contextutil.MaxWhileTries as e: | |
359 | raise RuntimeError(f"could not get subtree state from rank {rank}") from e | |
360 | ||
361 | def _wait_subtrees(self, test, status=None, rank=None, timeout=30, sleep=2, action=None, path=None): | |
81eedcae | 362 | test = sorted(test) |
f6b5b4d7 TL |
363 | try: |
364 | with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed: | |
365 | while proceed(): | |
366 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
367 | filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees]) | |
368 | log.info("%s =?= %s", filtered, test) | |
369 | if filtered == test: | |
370 | # Confirm export_pin in output is correct: | |
371 | for s in subtrees: | |
f67539c2 TL |
372 | if s['export_pin_target'] >= 0: |
373 | self.assertTrue(s['export_pin_target'] == s['auth_first']) | |
f6b5b4d7 TL |
374 | return subtrees |
375 | if action is not None: | |
376 | action() | |
377 | except contextutil.MaxWhileTries as e: | |
378 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e | |
e306af50 | 379 | |
f67539c2 TL |
380 | def _wait_until_scrub_complete(self, path="/", recursive=True, timeout=100): |
381 | out_json = self.fs.run_scrub(["start", path] + ["recursive"] if recursive else []) | |
382 | if not self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"], | |
383 | sleep=10, timeout=timeout): | |
384 | log.info("timed out waiting for scrub to complete") | |
e306af50 | 385 | |
f6b5b4d7 TL |
386 | def _wait_distributed_subtrees(self, count, status=None, rank=None, path=None): |
387 | try: | |
388 | with contextutil.safe_while(sleep=5, tries=20) as proceed: | |
389 | while proceed(): | |
390 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
f67539c2 TL |
391 | subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == True and |
392 | s['auth_first'] == s['export_pin_target'], | |
393 | subtrees)) | |
f6b5b4d7 TL |
394 | log.info(f"len={len(subtrees)} {subtrees}") |
395 | if len(subtrees) >= count: | |
396 | return subtrees | |
397 | except contextutil.MaxWhileTries as e: | |
398 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e | |
399 | ||
400 | def _wait_random_subtrees(self, count, status=None, rank=None, path=None): | |
401 | try: | |
402 | with contextutil.safe_while(sleep=5, tries=20) as proceed: | |
403 | while proceed(): | |
404 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
f67539c2 TL |
405 | subtrees = list(filter(lambda s: s['random_ephemeral_pin'] == True and |
406 | s['auth_first'] == s['export_pin_target'], | |
407 | subtrees)) | |
f6b5b4d7 TL |
408 | log.info(f"len={len(subtrees)} {subtrees}") |
409 | if len(subtrees) >= count: | |
410 | return subtrees | |
411 | except contextutil.MaxWhileTries as e: | |
412 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e | |
f67539c2 TL |
413 | |
414 | def run_cluster_cmd(self, cmd): | |
415 | if isinstance(cmd, str): | |
416 | cmd = shlex_split(cmd) | |
417 | return self.fs.mon_manager.raw_cluster_cmd(*cmd) | |
418 | ||
419 | def run_cluster_cmd_result(self, cmd): | |
420 | if isinstance(cmd, str): | |
421 | cmd = shlex_split(cmd) | |
422 | return self.fs.mon_manager.raw_cluster_cmd_result(*cmd) | |
423 | ||
424 | def create_client(self, client_id, moncap=None, osdcap=None, mdscap=None): | |
425 | if not (moncap or osdcap or mdscap): | |
426 | if self.fs: | |
427 | return self.fs.authorize(client_id, ('/', 'rw')) | |
428 | else: | |
429 | raise RuntimeError('no caps were passed and the default FS ' | |
430 | 'is not created yet to allow client auth ' | |
431 | 'for it.') | |
432 | ||
433 | cmd = ['auth', 'add', f'client.{client_id}'] | |
434 | if moncap: | |
435 | cmd += ['mon', moncap] | |
436 | if osdcap: | |
437 | cmd += ['osd', osdcap] | |
438 | if mdscap: | |
439 | cmd += ['mds', mdscap] | |
440 | ||
441 | self.run_cluster_cmd(cmd) | |
442 | return self.run_cluster_cmd(f'auth get {self.client_name}') |