]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import logging | |
7c673cae FG |
3 | from tasks.ceph_test_case import CephTestCase |
4 | import os | |
5 | import re | |
7c673cae FG |
6 | |
7 | from tasks.cephfs.fuse_mount import FuseMount | |
8 | ||
f6b5b4d7 | 9 | from teuthology import contextutil |
7c673cae FG |
10 | from teuthology.orchestra import run |
11 | from teuthology.orchestra.run import CommandFailedError | |
e306af50 | 12 | from teuthology.contextutil import safe_while |
7c673cae FG |
13 | |
14 | ||
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | ||
18 | def for_teuthology(f): | |
19 | """ | |
20 | Decorator that adds an "is_for_teuthology" attribute to the wrapped function | |
21 | """ | |
22 | f.is_for_teuthology = True | |
23 | return f | |
24 | ||
25 | ||
26 | def needs_trimming(f): | |
27 | """ | |
28 | Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse | |
29 | this means it needs to be able to run as root, currently) | |
30 | """ | |
31 | f.needs_trimming = True | |
32 | return f | |
33 | ||
34 | ||
35 | class CephFSTestCase(CephTestCase): | |
36 | """ | |
37 | Test case for Ceph FS, requires caller to populate Filesystem and Mounts, | |
38 | into the fs, mount_a, mount_b class attributes (setting mount_b is optional) | |
39 | ||
40 | Handles resetting the cluster under test between tests. | |
41 | """ | |
42 | ||
43 | # FIXME weird explicit naming | |
44 | mount_a = None | |
45 | mount_b = None | |
181888fb | 46 | recovery_mount = None |
7c673cae FG |
47 | |
48 | # Declarative test requirements: subclasses should override these to indicate | |
49 | # their special needs. If not met, tests will be skipped. | |
50 | CLIENTS_REQUIRED = 1 | |
51 | MDSS_REQUIRED = 1 | |
52 | REQUIRE_KCLIENT_REMOTE = False | |
53 | REQUIRE_ONE_CLIENT_REMOTE = False | |
7c673cae FG |
54 | |
55 | # Whether to create the default filesystem during setUp | |
56 | REQUIRE_FILESYSTEM = True | |
57 | ||
181888fb FG |
58 | # requires REQUIRE_FILESYSTEM = True |
59 | REQUIRE_RECOVERY_FILESYSTEM = False | |
60 | ||
9f95a23c | 61 | LOAD_SETTINGS = [] # type: ignore |
7c673cae FG |
62 | |
63 | def setUp(self): | |
64 | super(CephFSTestCase, self).setUp() | |
65 | ||
66 | if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED: | |
9f95a23c | 67 | self.skipTest("Only have {0} MDSs, require {1}".format( |
7c673cae FG |
68 | len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED |
69 | )) | |
70 | ||
71 | if len(self.mounts) < self.CLIENTS_REQUIRED: | |
9f95a23c | 72 | self.skipTest("Only have {0} clients, require {1}".format( |
7c673cae FG |
73 | len(self.mounts), self.CLIENTS_REQUIRED |
74 | )) | |
75 | ||
76 | if self.REQUIRE_KCLIENT_REMOTE: | |
77 | if not isinstance(self.mounts[0], FuseMount) or not isinstance(self.mounts[1], FuseMount): | |
78 | # kclient kill() power cycles nodes, so requires clients to each be on | |
79 | # their own node | |
80 | if self.mounts[0].client_remote.hostname == self.mounts[1].client_remote.hostname: | |
9f95a23c | 81 | self.skipTest("kclient clients must be on separate nodes") |
7c673cae FG |
82 | |
83 | if self.REQUIRE_ONE_CLIENT_REMOTE: | |
84 | if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames(): | |
9f95a23c | 85 | self.skipTest("Require first client to be on separate server from MDSs") |
7c673cae | 86 | |
7c673cae FG |
87 | # Create friendly mount_a, mount_b attrs |
88 | for i in range(0, self.CLIENTS_REQUIRED): | |
89 | setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i]) | |
90 | ||
91 | self.mds_cluster.clear_firewall() | |
92 | ||
93 | # Unmount all clients, we are about to blow away the filesystem | |
94 | for mount in self.mounts: | |
95 | if mount.is_mounted(): | |
96 | mount.umount_wait(force=True) | |
97 | ||
98 | # To avoid any issues with e.g. unlink bugs, we destroy and recreate | |
99 | # the filesystem rather than just doing a rm -rf of files | |
7c673cae | 100 | self.mds_cluster.delete_all_filesystems() |
92f5a8d4 | 101 | self.mds_cluster.mds_restart() # to reset any run-time configs, etc. |
7c673cae | 102 | self.fs = None # is now invalid! |
181888fb | 103 | self.recovery_fs = None |
7c673cae | 104 | |
7c673cae FG |
105 | # In case anything is in the OSD blacklist list, clear it out. This is to avoid |
106 | # the OSD map changing in the background (due to blacklist expiry) while tests run. | |
107 | try: | |
108 | self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear") | |
109 | except CommandFailedError: | |
110 | # Fallback for older Ceph cluster | |
111 | blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd", | |
112 | "dump", "--format=json-pretty"))['blacklist'] | |
113 | log.info("Removing {0} blacklist entries".format(len(blacklist))) | |
114 | for addr, blacklisted_at in blacklist.items(): | |
115 | self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr) | |
116 | ||
117 | client_mount_ids = [m.client_id for m in self.mounts] | |
118 | # In case the test changes the IDs of clients, stash them so that we can | |
119 | # reset in tearDown | |
120 | self._original_client_ids = client_mount_ids | |
121 | log.info(client_mount_ids) | |
122 | ||
123 | # In case there were any extra auth identities around from a previous | |
124 | # test, delete them | |
125 | for entry in self.auth_list(): | |
126 | ent_type, ent_id = entry['entity'].split(".") | |
127 | if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin": | |
128 | self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity']) | |
129 | ||
130 | if self.REQUIRE_FILESYSTEM: | |
181888fb | 131 | self.fs = self.mds_cluster.newfs(create=True) |
7c673cae FG |
132 | |
133 | # In case some test messed with auth caps, reset them | |
134 | for client_id in client_mount_ids: | |
135 | self.mds_cluster.mon_manager.raw_cluster_cmd_result( | |
136 | 'auth', 'caps', "client.{0}".format(client_id), | |
137 | 'mds', 'allow', | |
138 | 'mon', 'allow r', | |
139 | 'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name())) | |
140 | ||
92f5a8d4 | 141 | # wait for ranks to become active |
7c673cae FG |
142 | self.fs.wait_for_daemons() |
143 | ||
144 | # Mount the requested number of clients | |
145 | for i in range(0, self.CLIENTS_REQUIRED): | |
e306af50 | 146 | self.mounts[i].mount_wait() |
7c673cae | 147 | |
181888fb FG |
148 | if self.REQUIRE_RECOVERY_FILESYSTEM: |
149 | if not self.REQUIRE_FILESYSTEM: | |
9f95a23c | 150 | self.skipTest("Recovery filesystem requires a primary filesystem as well") |
181888fb FG |
151 | self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set', |
152 | 'enable_multiple', 'true', | |
153 | '--yes-i-really-mean-it') | |
154 | self.recovery_fs = self.mds_cluster.newfs(name="recovery_fs", create=False) | |
155 | self.recovery_fs.set_metadata_overlay(True) | |
156 | self.recovery_fs.set_data_pool_name(self.fs.get_data_pool_name()) | |
157 | self.recovery_fs.create() | |
158 | self.recovery_fs.getinfo(refresh=True) | |
159 | self.recovery_fs.mds_restart() | |
160 | self.recovery_fs.wait_for_daemons() | |
161 | ||
7c673cae FG |
162 | # Load an config settings of interest |
163 | for setting in self.LOAD_SETTINGS: | |
c07f9fc5 | 164 | setattr(self, setting, float(self.fs.mds_asok( |
e306af50 | 165 | ['config', 'get', setting], list(self.mds_cluster.mds_ids)[0] |
7c673cae FG |
166 | )[setting])) |
167 | ||
168 | self.configs_set = set() | |
169 | ||
170 | def tearDown(self): | |
7c673cae FG |
171 | self.mds_cluster.clear_firewall() |
172 | for m in self.mounts: | |
173 | m.teardown() | |
174 | ||
175 | for i, m in enumerate(self.mounts): | |
176 | m.client_id = self._original_client_ids[i] | |
177 | ||
178 | for subsys, key in self.configs_set: | |
179 | self.mds_cluster.clear_ceph_conf(subsys, key) | |
180 | ||
9f95a23c TL |
181 | return super(CephFSTestCase, self).tearDown() |
182 | ||
7c673cae FG |
183 | def set_conf(self, subsys, key, value): |
184 | self.configs_set.add((subsys, key)) | |
185 | self.mds_cluster.set_ceph_conf(subsys, key, value) | |
186 | ||
187 | def auth_list(self): | |
188 | """ | |
c07f9fc5 | 189 | Convenience wrapper on "ceph auth ls" |
7c673cae FG |
190 | """ |
191 | return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd( | |
c07f9fc5 | 192 | "auth", "ls", "--format=json-pretty" |
7c673cae FG |
193 | ))['auth_dump'] |
194 | ||
195 | def assert_session_count(self, expected, ls_data=None, mds_id=None): | |
196 | if ls_data is None: | |
197 | ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id) | |
198 | ||
31f18b77 FG |
199 | alive_count = len([s for s in ls_data if s['state'] != 'killing']) |
200 | ||
201 | self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format( | |
202 | expected, alive_count | |
7c673cae FG |
203 | )) |
204 | ||
205 | def assert_session_state(self, client_id, expected_state): | |
206 | self.assertEqual( | |
207 | self._session_by_id( | |
208 | self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'], | |
209 | expected_state) | |
210 | ||
211 | def get_session_data(self, client_id): | |
212 | return self._session_by_id(client_id) | |
213 | ||
214 | def _session_list(self): | |
215 | ls_data = self.fs.mds_asok(['session', 'ls']) | |
216 | ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] | |
217 | return ls_data | |
218 | ||
219 | def get_session(self, client_id, session_ls=None): | |
220 | if session_ls is None: | |
221 | session_ls = self.fs.mds_asok(['session', 'ls']) | |
222 | ||
223 | return self._session_by_id(session_ls)[client_id] | |
224 | ||
225 | def _session_by_id(self, session_ls): | |
226 | return dict([(s['id'], s) for s in session_ls]) | |
227 | ||
92f5a8d4 TL |
228 | def wait_until_evicted(self, client_id, timeout=30): |
229 | def is_client_evicted(): | |
230 | ls = self._session_list() | |
231 | for s in ls: | |
232 | if s['id'] == client_id: | |
233 | return False | |
234 | return True | |
235 | self.wait_until_true(is_client_evicted, timeout) | |
236 | ||
7c673cae FG |
237 | def wait_for_daemon_start(self, daemon_ids=None): |
238 | """ | |
239 | Wait until all the daemons appear in the FSMap, either assigned | |
240 | MDS ranks or in the list of standbys | |
241 | """ | |
242 | def get_daemon_names(): | |
243 | return [info['name'] for info in self.mds_cluster.status().get_all()] | |
244 | ||
245 | if daemon_ids is None: | |
246 | daemon_ids = self.mds_cluster.mds_ids | |
247 | ||
248 | try: | |
249 | self.wait_until_true( | |
250 | lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids), | |
251 | timeout=30 | |
252 | ) | |
253 | except RuntimeError: | |
e306af50 | 254 | log.warning("Timeout waiting for daemons {0}, while we have {1}".format( |
7c673cae FG |
255 | daemon_ids, get_daemon_names() |
256 | )) | |
257 | raise | |
258 | ||
11fdf7f2 TL |
259 | def delete_mds_coredump(self, daemon_id): |
260 | # delete coredump file, otherwise teuthology.internal.coredump will | |
261 | # catch it later and treat it as a failure. | |
e306af50 TL |
262 | core_pattern = self.mds_cluster.mds_daemons[daemon_id].remote.sh( |
263 | "sudo sysctl -n kernel.core_pattern") | |
264 | core_dir = os.path.dirname(core_pattern.strip()) | |
11fdf7f2 TL |
265 | if core_dir: # Non-default core_pattern with a directory in it |
266 | # We have seen a core_pattern that looks like it's from teuthology's coredump | |
267 | # task, so proceed to clear out the core file | |
f6b5b4d7 TL |
268 | if core_dir[0] == '|': |
269 | log.info("Piped core dumps to program {0}, skip cleaning".format(core_dir[1:])) | |
270 | return; | |
271 | ||
11fdf7f2 TL |
272 | log.info("Clearing core from directory: {0}".format(core_dir)) |
273 | ||
274 | # Verify that we see the expected single coredump | |
e306af50 | 275 | ls_output = self.mds_cluster.mds_daemons[daemon_id].remote.sh([ |
11fdf7f2 TL |
276 | "cd", core_dir, run.Raw('&&'), |
277 | "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file" | |
e306af50 | 278 | ]) |
11fdf7f2 | 279 | cores = [l.partition(":")[0] |
e306af50 | 280 | for l in ls_output.strip().split("\n") |
11fdf7f2 TL |
281 | if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)] |
282 | ||
283 | log.info("Enumerated cores: {0}".format(cores)) | |
284 | self.assertEqual(len(cores), 1) | |
285 | ||
286 | log.info("Found core file {0}, deleting it".format(cores[0])) | |
287 | ||
288 | self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[ | |
289 | "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0] | |
290 | ]) | |
7c673cae | 291 | else: |
11fdf7f2 | 292 | log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)") |
81eedcae | 293 | |
f6b5b4d7 TL |
294 | def _get_subtrees(self, status=None, rank=None, path=None): |
295 | if path is None: | |
296 | path = "/" | |
297 | try: | |
298 | with contextutil.safe_while(sleep=1, tries=3) as proceed: | |
299 | while proceed(): | |
300 | try: | |
301 | if rank == "all": | |
302 | subtrees = [] | |
303 | for r in self.fs.get_ranks(status=status): | |
304 | s = self.fs.rank_asok(["get", "subtrees"], status=status, rank=r['rank']) | |
305 | s = filter(lambda s: s['auth_first'] == r['rank'] and s['auth_second'] == -2, s) | |
306 | subtrees += s | |
307 | else: | |
308 | subtrees = self.fs.rank_asok(["get", "subtrees"], status=status, rank=rank) | |
309 | subtrees = filter(lambda s: s['dir']['path'].startswith(path), subtrees) | |
310 | return list(subtrees) | |
311 | except CommandFailedError as e: | |
312 | # Sometimes we get transient errors | |
313 | if e.exitstatus == 22: | |
314 | pass | |
315 | else: | |
316 | raise | |
317 | except contextutil.MaxWhileTries as e: | |
318 | raise RuntimeError(f"could not get subtree state from rank {rank}") from e | |
319 | ||
320 | def _wait_subtrees(self, test, status=None, rank=None, timeout=30, sleep=2, action=None, path=None): | |
81eedcae | 321 | test = sorted(test) |
f6b5b4d7 TL |
322 | try: |
323 | with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed: | |
324 | while proceed(): | |
325 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
326 | filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees]) | |
327 | log.info("%s =?= %s", filtered, test) | |
328 | if filtered == test: | |
329 | # Confirm export_pin in output is correct: | |
330 | for s in subtrees: | |
331 | if s['export_pin'] >= 0: | |
332 | self.assertTrue(s['export_pin'] == s['auth_first']) | |
333 | return subtrees | |
334 | if action is not None: | |
335 | action() | |
336 | except contextutil.MaxWhileTries as e: | |
337 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e | |
e306af50 TL |
338 | |
339 | def _wait_until_scrub_complete(self, path="/", recursive=True): | |
340 | out_json = self.fs.rank_tell(["scrub", "start", path] + ["recursive"] if recursive else []) | |
341 | with safe_while(sleep=10, tries=10) as proceed: | |
342 | while proceed(): | |
343 | out_json = self.fs.rank_tell(["scrub", "status"]) | |
344 | if out_json['status'] == "no active scrubs running": | |
345 | break; | |
346 | ||
f6b5b4d7 TL |
347 | def _wait_distributed_subtrees(self, count, status=None, rank=None, path=None): |
348 | try: | |
349 | with contextutil.safe_while(sleep=5, tries=20) as proceed: | |
350 | while proceed(): | |
351 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
352 | subtrees = list(filter(lambda s: s['distributed_ephemeral_pin'] == True, subtrees)) | |
353 | log.info(f"len={len(subtrees)} {subtrees}") | |
354 | if len(subtrees) >= count: | |
355 | return subtrees | |
356 | except contextutil.MaxWhileTries as e: | |
357 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e | |
358 | ||
359 | def _wait_random_subtrees(self, count, status=None, rank=None, path=None): | |
360 | try: | |
361 | with contextutil.safe_while(sleep=5, tries=20) as proceed: | |
362 | while proceed(): | |
363 | subtrees = self._get_subtrees(status=status, rank=rank, path=path) | |
364 | subtrees = list(filter(lambda s: s['random_ephemeral_pin'] == True, subtrees)) | |
365 | log.info(f"len={len(subtrees)} {subtrees}") | |
366 | if len(subtrees) >= count: | |
367 | return subtrees | |
368 | except contextutil.MaxWhileTries as e: | |
369 | raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank)) from e |