]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | MDS admin socket scrubbing-related tests. | |
3 | """ | |
4 | import json | |
5 | import logging | |
6 | import errno | |
7 | import time | |
8 | from teuthology.exceptions import CommandFailedError | |
f91f0fd5 | 9 | from teuthology.contextutil import safe_while |
7c673cae FG |
10 | import os |
11 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
11fdf7f2 TL |
15 | class TestScrubControls(CephFSTestCase): |
16 | """ | |
17 | Test basic scrub control operations such as abort, pause and resume. | |
18 | """ | |
19 | ||
1911f103 | 20 | MDSS_REQUIRED = 2 |
11fdf7f2 TL |
21 | CLIENTS_REQUIRED = 1 |
22 | ||
23 | def _abort_scrub(self, expected): | |
f67539c2 | 24 | res = self.fs.run_scrub(["abort"]) |
11fdf7f2 TL |
25 | self.assertEqual(res['return_code'], expected) |
26 | def _pause_scrub(self, expected): | |
f67539c2 | 27 | res = self.fs.run_scrub(["pause"]) |
11fdf7f2 TL |
28 | self.assertEqual(res['return_code'], expected) |
29 | def _resume_scrub(self, expected): | |
f67539c2 | 30 | res = self.fs.run_scrub(["resume"]) |
11fdf7f2 | 31 | self.assertEqual(res['return_code'], expected) |
f91f0fd5 TL |
32 | def _check_task_status(self, expected_status, timo=120): |
33 | """ check scrub status for current active mds in ceph status """ | |
34 | with safe_while(sleep=1, tries=120, action='wait for task status') as proceed: | |
35 | while proceed(): | |
36 | active = self.fs.get_active_names() | |
37 | log.debug("current active={0}".format(active)) | |
38 | task_status = self.fs.get_task_status("scrub status") | |
39 | try: | |
40 | if task_status[active[0]].startswith(expected_status): | |
41 | return True | |
42 | except KeyError: | |
43 | pass | |
44 | ||
45 | def _check_task_status_na(self, timo=120): | |
46 | """ check absence of scrub status in ceph status """ | |
47 | with safe_while(sleep=1, tries=120, action='wait for task status') as proceed: | |
48 | while proceed(): | |
49 | active = self.fs.get_active_names() | |
50 | log.debug("current active={0}".format(active)) | |
51 | task_status = self.fs.get_task_status("scrub status") | |
52 | if not active[0] in task_status: | |
53 | return True | |
54 | ||
55 | def create_scrub_data(self, test_dir): | |
56 | for i in range(32): | |
57 | dirname = "dir.{0}".format(i) | |
58 | dirpath = os.path.join(test_dir, dirname) | |
59 | self.mount_a.run_shell_payload(f""" | |
60 | set -e | |
61 | mkdir -p {dirpath} | |
62 | for ((i = 0; i < 32; i++)); do | |
63 | dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1 | |
64 | done | |
65 | """) | |
11fdf7f2 TL |
66 | |
67 | def test_scrub_abort(self): | |
68 | test_dir = "scrub_control_test_path" | |
69 | abs_test_path = "/{0}".format(test_dir) | |
70 | ||
f91f0fd5 | 71 | self.create_scrub_data(test_dir) |
11fdf7f2 | 72 | |
f67539c2 | 73 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
74 | self.assertNotEqual(out_json, None) |
75 | ||
76 | # abort and verify | |
77 | self._abort_scrub(0) | |
f67539c2 | 78 | self.fs.wait_until_scrub_complete(sleep=5, timeout=30) |
11fdf7f2 | 79 | |
9f95a23c | 80 | # sleep enough to fetch updated task status |
f91f0fd5 TL |
81 | checked = self._check_task_status_na() |
82 | self.assertTrue(checked) | |
9f95a23c | 83 | |
11fdf7f2 TL |
84 | def test_scrub_pause_and_resume(self): |
85 | test_dir = "scrub_control_test_path" | |
86 | abs_test_path = "/{0}".format(test_dir) | |
87 | ||
88 | log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) | |
89 | client_path = os.path.join(self.mount_a.mountpoint, test_dir) | |
90 | log.info("client_path: {0}".format(client_path)) | |
91 | ||
f91f0fd5 | 92 | self.create_scrub_data(test_dir) |
11fdf7f2 | 93 | |
f67539c2 | 94 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
95 | self.assertNotEqual(out_json, None) |
96 | ||
97 | # pause and verify | |
98 | self._pause_scrub(0) | |
f67539c2 | 99 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
100 | self.assertTrue("PAUSED" in out_json['status']) |
101 | ||
f91f0fd5 TL |
102 | checked = self._check_task_status("paused") |
103 | self.assertTrue(checked) | |
9f95a23c | 104 | |
11fdf7f2 TL |
105 | # resume and verify |
106 | self._resume_scrub(0) | |
f67539c2 | 107 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
108 | self.assertFalse("PAUSED" in out_json['status']) |
109 | ||
f91f0fd5 TL |
110 | checked = self._check_task_status_na() |
111 | self.assertTrue(checked) | |
112 | ||
11fdf7f2 TL |
113 | def test_scrub_pause_and_resume_with_abort(self): |
114 | test_dir = "scrub_control_test_path" | |
115 | abs_test_path = "/{0}".format(test_dir) | |
116 | ||
f91f0fd5 | 117 | self.create_scrub_data(test_dir) |
11fdf7f2 | 118 | |
f67539c2 | 119 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
120 | self.assertNotEqual(out_json, None) |
121 | ||
122 | # pause and verify | |
123 | self._pause_scrub(0) | |
f67539c2 | 124 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
125 | self.assertTrue("PAUSED" in out_json['status']) |
126 | ||
f91f0fd5 TL |
127 | checked = self._check_task_status("paused") |
128 | self.assertTrue(checked) | |
9f95a23c | 129 | |
11fdf7f2 TL |
130 | # abort and verify |
131 | self._abort_scrub(0) | |
f67539c2 | 132 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
133 | self.assertTrue("PAUSED" in out_json['status']) |
134 | self.assertTrue("0 inodes" in out_json['status']) | |
135 | ||
f91f0fd5 TL |
136 | # scrub status should still be paused... |
137 | checked = self._check_task_status("paused") | |
138 | self.assertTrue(checked) | |
9f95a23c | 139 | |
11fdf7f2 TL |
140 | # resume and verify |
141 | self._resume_scrub(0) | |
f67539c2 | 142 | out_json = self.fs.get_scrub_status() |
11fdf7f2 | 143 | self.assertTrue("no active" in out_json['status']) |
7c673cae | 144 | |
f91f0fd5 TL |
145 | checked = self._check_task_status_na() |
146 | self.assertTrue(checked) | |
9f95a23c | 147 | |
1911f103 | 148 | def test_scrub_task_status_on_mds_failover(self): |
1911f103 TL |
149 | (original_active, ) = self.fs.get_active_names() |
150 | original_standbys = self.mds_cluster.get_standby_daemons() | |
f91f0fd5 TL |
151 | |
152 | test_dir = "scrub_control_test_path" | |
153 | abs_test_path = "/{0}".format(test_dir) | |
154 | ||
155 | self.create_scrub_data(test_dir) | |
156 | ||
f67539c2 | 157 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
f91f0fd5 TL |
158 | self.assertNotEqual(out_json, None) |
159 | ||
160 | # pause and verify | |
161 | self._pause_scrub(0) | |
f67539c2 | 162 | out_json = self.fs.get_scrub_status() |
f91f0fd5 TL |
163 | self.assertTrue("PAUSED" in out_json['status']) |
164 | ||
165 | checked = self._check_task_status("paused") | |
166 | self.assertTrue(checked) | |
1911f103 TL |
167 | |
168 | # Kill the rank 0 | |
169 | self.fs.mds_stop(original_active) | |
170 | ||
1911f103 TL |
171 | def promoted(): |
172 | active = self.fs.get_active_names() | |
173 | return active and active[0] in original_standbys | |
174 | ||
175 | log.info("Waiting for promotion of one of the original standbys {0}".format( | |
176 | original_standbys)) | |
20effc67 | 177 | self.wait_until_true(promoted, timeout=self.fs.beacon_timeout) |
1911f103 | 178 | |
f91f0fd5 | 179 | self._check_task_status_na() |
1911f103 | 180 | |
7c673cae FG |
181 | class TestScrubChecks(CephFSTestCase): |
182 | """ | |
183 | Run flush and scrub commands on the specified files in the filesystem. This | |
184 | task will run through a sequence of operations, but it is not comprehensive | |
185 | on its own -- it doesn't manipulate the mds cache state to test on both | |
186 | in- and out-of-memory parts of the hierarchy. So it's designed to be run | |
187 | multiple times within a single test run, so that the test can manipulate | |
188 | memory state. | |
189 | ||
190 | Usage: | |
191 | mds_scrub_checks: | |
192 | mds_rank: 0 | |
193 | path: path/to/test/dir | |
194 | client: 0 | |
195 | run_seq: [0-9]+ | |
196 | ||
197 | Increment the run_seq on subsequent invocations within a single test run; | |
198 | it uses that value to generate unique folder and file names. | |
199 | """ | |
200 | ||
201 | MDSS_REQUIRED = 1 | |
202 | CLIENTS_REQUIRED = 1 | |
203 | ||
204 | def test_scrub_checks(self): | |
205 | self._checks(0) | |
206 | self._checks(1) | |
207 | ||
208 | def _checks(self, run_seq): | |
209 | mds_rank = 0 | |
210 | test_dir = "scrub_test_path" | |
211 | ||
212 | abs_test_path = "/{0}".format(test_dir) | |
213 | ||
214 | log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) | |
215 | client_path = os.path.join(self.mount_a.mountpoint, test_dir) | |
216 | log.info("client_path: {0}".format(client_path)) | |
217 | ||
218 | log.info("Cloning repo into place") | |
11fdf7f2 | 219 | repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) |
7c673cae | 220 | |
9f95a23c TL |
221 | log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format( |
222 | id_=mds_rank, path=abs_test_path, seq=run_seq) | |
223 | ) | |
7c673cae FG |
224 | |
225 | ||
226 | success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0) | |
227 | ||
228 | nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path) | |
229 | self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep), | |
230 | lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) | |
11fdf7f2 | 231 | self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep), |
7c673cae FG |
232 | lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) |
233 | ||
234 | test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path) | |
235 | dirpath = "{repo_path}/suites".format(repo_path=test_repo_path) | |
236 | ||
237 | if run_seq == 0: | |
238 | log.info("First run: flushing {dirpath}".format(dirpath=dirpath)) | |
239 | command = "flush_path {dirpath}".format(dirpath=dirpath) | |
240 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
241 | command = "scrub start {dirpath}".format(dirpath=dirpath) |
242 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae FG |
243 | |
244 | filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format( | |
245 | repo_path=test_repo_path) | |
246 | if run_seq == 0: | |
247 | log.info("First run: flushing {filepath}".format(filepath=filepath)) | |
248 | command = "flush_path {filepath}".format(filepath=filepath) | |
249 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
250 | command = "scrub start {filepath}".format(filepath=filepath) |
251 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae | 252 | |
7c673cae FG |
253 | if run_seq == 0: |
254 | log.info("First run: flushing base dir /") | |
255 | command = "flush_path /" | |
256 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
257 | command = "scrub start /" |
258 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae FG |
259 | |
260 | new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq) | |
261 | test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path, | |
262 | i=run_seq) | |
263 | self.mount_a.run_shell(["mkdir", new_dir]) | |
264 | command = "flush_path {dir}".format(dir=test_new_dir) | |
265 | self.asok_command(mds_rank, command, success_validator) | |
266 | ||
267 | new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path, | |
268 | i=run_seq) | |
269 | test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path, | |
270 | i=run_seq) | |
271 | self.mount_a.write_n_mb(new_file, 1) | |
272 | ||
273 | command = "flush_path {file}".format(file=test_new_file) | |
274 | self.asok_command(mds_rank, command, success_validator) | |
275 | ||
276 | # check that scrub fails on errors | |
277 | ino = self.mount_a.path_to_ino(new_file) | |
278 | rados_obj_name = "{ino:x}.00000000".format(ino=ino) | |
11fdf7f2 | 279 | command = "scrub start {file}".format(file=test_new_file) |
7c673cae | 280 | |
f67539c2 TL |
281 | def _check_and_clear_damage(ino, dtype): |
282 | all_damage = self.fs.rank_tell(["damage", "ls"], mds_rank) | |
283 | damage = [d for d in all_damage if d['ino'] == ino and d['damage_type'] == dtype] | |
284 | for d in damage: | |
285 | self.fs.mon_manager.raw_cluster_cmd( | |
286 | 'tell', 'mds.{0}'.format(self.fs.get_active_names()[mds_rank]), | |
287 | "damage", "rm", str(d['id'])) | |
288 | return len(damage) > 0 | |
289 | ||
290 | # Missing parent xattr | |
291 | self.assertFalse(_check_and_clear_damage(ino, "backtrace")); | |
7c673cae | 292 | self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name()) |
f67539c2 TL |
293 | self.tell_command(mds_rank, command, success_validator) |
294 | self.fs.wait_until_scrub_complete(sleep=5, timeout=30) | |
295 | self.assertTrue(_check_and_clear_damage(ino, "backtrace")); | |
7c673cae FG |
296 | |
297 | command = "flush_path /" | |
298 | self.asok_command(mds_rank, command, success_validator) | |
299 | ||
300 | def test_scrub_repair(self): | |
301 | mds_rank = 0 | |
302 | test_dir = "scrub_repair_path" | |
303 | ||
522d829b TL |
304 | self.mount_a.run_shell(["mkdir", test_dir]) |
305 | self.mount_a.run_shell(["touch", "{0}/file".format(test_dir)]) | |
7c673cae FG |
306 | dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir)) |
307 | ||
308 | self.mount_a.umount_wait() | |
309 | ||
310 | # flush journal entries to dirfrag objects, and expire journal | |
311 | self.fs.mds_asok(['flush', 'journal']) | |
312 | self.fs.mds_stop() | |
313 | ||
314 | # remove the dentry from dirfrag, cause incorrect fragstat/rstat | |
f67539c2 | 315 | self.fs.radosm(["rmomapkey", dir_objname, "file_head"]) |
7c673cae FG |
316 | |
317 | self.fs.mds_fail_restart() | |
318 | self.fs.wait_for_daemons() | |
319 | ||
e306af50 | 320 | self.mount_a.mount_wait() |
7c673cae FG |
321 | |
322 | # fragstat indicates the directory is not empty, rmdir should fail | |
323 | with self.assertRaises(CommandFailedError) as ar: | |
522d829b | 324 | self.mount_a.run_shell(["rmdir", test_dir]) |
7c673cae FG |
325 | self.assertEqual(ar.exception.exitstatus, 1) |
326 | ||
11fdf7f2 | 327 | self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir), |
7c673cae FG |
328 | lambda j, r: self.json_validator(j, r, "return_code", 0)) |
329 | ||
9f95a23c TL |
330 | # wait a few second for background repair |
331 | time.sleep(10) | |
7c673cae | 332 | |
9f95a23c | 333 | # fragstat should be fixed |
522d829b | 334 | self.mount_a.run_shell(["rmdir", test_dir]) |
7c673cae FG |
335 | |
336 | @staticmethod | |
337 | def json_validator(json_out, rc, element, expected_value): | |
338 | if rc != 0: | |
339 | return False, "asok command returned error {rc}".format(rc=rc) | |
340 | element_value = json_out.get(element) | |
341 | if element_value != expected_value: | |
342 | return False, "unexpectedly got {jv} instead of {ev}!".format( | |
343 | jv=element_value, ev=expected_value) | |
344 | return True, "Succeeded" | |
345 | ||
11fdf7f2 TL |
346 | def tell_command(self, mds_rank, command, validator): |
347 | log.info("Running command '{command}'".format(command=command)) | |
348 | ||
349 | command_list = command.split() | |
350 | jout = self.fs.rank_tell(command_list, mds_rank) | |
351 | ||
352 | log.info("command '{command}' returned '{jout}'".format( | |
353 | command=command, jout=jout)) | |
354 | ||
355 | success, errstring = validator(jout, 0) | |
356 | if not success: | |
9f95a23c | 357 | raise AsokCommandFailedError(command, 0, jout, errstring) |
11fdf7f2 TL |
358 | return jout |
359 | ||
7c673cae FG |
360 | def asok_command(self, mds_rank, command, validator): |
361 | log.info("Running command '{command}'".format(command=command)) | |
362 | ||
363 | command_list = command.split() | |
364 | ||
365 | # we just assume there's an active mds for every rank | |
366 | mds_id = self.fs.get_active_names()[mds_rank] | |
367 | proc = self.fs.mon_manager.admin_socket('mds', mds_id, | |
368 | command_list, check_status=False) | |
369 | rout = proc.exitstatus | |
370 | sout = proc.stdout.getvalue() | |
371 | ||
372 | if sout.strip(): | |
373 | jout = json.loads(sout) | |
374 | else: | |
375 | jout = None | |
376 | ||
9f95a23c TL |
377 | log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format( |
378 | command=command, rout=rout, sout=sout)) | |
7c673cae FG |
379 | |
380 | success, errstring = validator(jout, rout) | |
381 | ||
382 | if not success: | |
383 | raise AsokCommandFailedError(command, rout, jout, errstring) | |
384 | ||
385 | return jout | |
386 | ||
11fdf7f2 TL |
387 | @staticmethod |
388 | def clone_repo(client_mount, path): | |
7c673cae FG |
389 | repo = "ceph-qa-suite" |
390 | repo_path = os.path.join(path, repo) | |
391 | client_mount.run_shell(["mkdir", "-p", path]) | |
392 | ||
393 | try: | |
394 | client_mount.stat(repo_path) | |
395 | except CommandFailedError: | |
396 | client_mount.run_shell([ | |
397 | "git", "clone", '--branch', 'giant', | |
398 | "http://github.com/ceph/{repo}".format(repo=repo), | |
399 | "{path}/{repo}".format(path=path, repo=repo) | |
400 | ]) | |
401 | ||
402 | return repo_path | |
403 | ||
404 | ||
405 | class AsokCommandFailedError(Exception): | |
406 | """ | |
407 | Exception thrown when we get an unexpected response | |
408 | on an admin socket command | |
409 | """ | |
410 | ||
411 | def __init__(self, command, rc, json_out, errstring): | |
412 | self.command = command | |
413 | self.rc = rc | |
414 | self.json = json_out | |
415 | self.errstring = errstring | |
416 | ||
417 | def __str__(self): | |
9f95a23c TL |
418 | return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format( |
419 | command=self.command, rc=self.rc, json=self.json, es=self.errstring) |