]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | MDS admin socket scrubbing-related tests. | |
3 | """ | |
4 | import json | |
5 | import logging | |
6 | import errno | |
7 | import time | |
8 | from teuthology.exceptions import CommandFailedError | |
f91f0fd5 | 9 | from teuthology.contextutil import safe_while |
7c673cae FG |
10 | import os |
11 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
11fdf7f2 TL |
15 | class TestScrubControls(CephFSTestCase): |
16 | """ | |
17 | Test basic scrub control operations such as abort, pause and resume. | |
18 | """ | |
19 | ||
1911f103 | 20 | MDSS_REQUIRED = 2 |
11fdf7f2 TL |
21 | CLIENTS_REQUIRED = 1 |
22 | ||
23 | def _abort_scrub(self, expected): | |
f67539c2 | 24 | res = self.fs.run_scrub(["abort"]) |
11fdf7f2 TL |
25 | self.assertEqual(res['return_code'], expected) |
26 | def _pause_scrub(self, expected): | |
f67539c2 | 27 | res = self.fs.run_scrub(["pause"]) |
11fdf7f2 TL |
28 | self.assertEqual(res['return_code'], expected) |
29 | def _resume_scrub(self, expected): | |
f67539c2 | 30 | res = self.fs.run_scrub(["resume"]) |
11fdf7f2 | 31 | self.assertEqual(res['return_code'], expected) |
f91f0fd5 TL |
32 | def _check_task_status(self, expected_status, timo=120): |
33 | """ check scrub status for current active mds in ceph status """ | |
34 | with safe_while(sleep=1, tries=120, action='wait for task status') as proceed: | |
35 | while proceed(): | |
36 | active = self.fs.get_active_names() | |
37 | log.debug("current active={0}".format(active)) | |
38 | task_status = self.fs.get_task_status("scrub status") | |
39 | try: | |
40 | if task_status[active[0]].startswith(expected_status): | |
41 | return True | |
42 | except KeyError: | |
43 | pass | |
44 | ||
45 | def _check_task_status_na(self, timo=120): | |
46 | """ check absence of scrub status in ceph status """ | |
47 | with safe_while(sleep=1, tries=120, action='wait for task status') as proceed: | |
48 | while proceed(): | |
49 | active = self.fs.get_active_names() | |
50 | log.debug("current active={0}".format(active)) | |
51 | task_status = self.fs.get_task_status("scrub status") | |
52 | if not active[0] in task_status: | |
53 | return True | |
54 | ||
55 | def create_scrub_data(self, test_dir): | |
56 | for i in range(32): | |
57 | dirname = "dir.{0}".format(i) | |
58 | dirpath = os.path.join(test_dir, dirname) | |
59 | self.mount_a.run_shell_payload(f""" | |
60 | set -e | |
61 | mkdir -p {dirpath} | |
62 | for ((i = 0; i < 32; i++)); do | |
63 | dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1 | |
64 | done | |
65 | """) | |
11fdf7f2 TL |
66 | |
67 | def test_scrub_abort(self): | |
68 | test_dir = "scrub_control_test_path" | |
69 | abs_test_path = "/{0}".format(test_dir) | |
70 | ||
f91f0fd5 | 71 | self.create_scrub_data(test_dir) |
11fdf7f2 | 72 | |
f67539c2 | 73 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
74 | self.assertNotEqual(out_json, None) |
75 | ||
76 | # abort and verify | |
77 | self._abort_scrub(0) | |
f67539c2 | 78 | self.fs.wait_until_scrub_complete(sleep=5, timeout=30) |
11fdf7f2 | 79 | |
9f95a23c | 80 | # sleep enough to fetch updated task status |
f91f0fd5 TL |
81 | checked = self._check_task_status_na() |
82 | self.assertTrue(checked) | |
9f95a23c | 83 | |
11fdf7f2 TL |
84 | def test_scrub_pause_and_resume(self): |
85 | test_dir = "scrub_control_test_path" | |
86 | abs_test_path = "/{0}".format(test_dir) | |
87 | ||
88 | log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) | |
89 | client_path = os.path.join(self.mount_a.mountpoint, test_dir) | |
90 | log.info("client_path: {0}".format(client_path)) | |
91 | ||
f91f0fd5 | 92 | self.create_scrub_data(test_dir) |
11fdf7f2 | 93 | |
f67539c2 | 94 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
95 | self.assertNotEqual(out_json, None) |
96 | ||
97 | # pause and verify | |
98 | self._pause_scrub(0) | |
f67539c2 | 99 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
100 | self.assertTrue("PAUSED" in out_json['status']) |
101 | ||
f91f0fd5 TL |
102 | checked = self._check_task_status("paused") |
103 | self.assertTrue(checked) | |
9f95a23c | 104 | |
11fdf7f2 TL |
105 | # resume and verify |
106 | self._resume_scrub(0) | |
f67539c2 | 107 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
108 | self.assertFalse("PAUSED" in out_json['status']) |
109 | ||
f91f0fd5 TL |
110 | checked = self._check_task_status_na() |
111 | self.assertTrue(checked) | |
112 | ||
11fdf7f2 TL |
113 | def test_scrub_pause_and_resume_with_abort(self): |
114 | test_dir = "scrub_control_test_path" | |
115 | abs_test_path = "/{0}".format(test_dir) | |
116 | ||
f91f0fd5 | 117 | self.create_scrub_data(test_dir) |
11fdf7f2 | 118 | |
f67539c2 | 119 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
11fdf7f2 TL |
120 | self.assertNotEqual(out_json, None) |
121 | ||
122 | # pause and verify | |
123 | self._pause_scrub(0) | |
f67539c2 | 124 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
125 | self.assertTrue("PAUSED" in out_json['status']) |
126 | ||
f91f0fd5 TL |
127 | checked = self._check_task_status("paused") |
128 | self.assertTrue(checked) | |
9f95a23c | 129 | |
11fdf7f2 TL |
130 | # abort and verify |
131 | self._abort_scrub(0) | |
f67539c2 | 132 | out_json = self.fs.get_scrub_status() |
11fdf7f2 TL |
133 | self.assertTrue("PAUSED" in out_json['status']) |
134 | self.assertTrue("0 inodes" in out_json['status']) | |
135 | ||
f91f0fd5 TL |
136 | # scrub status should still be paused... |
137 | checked = self._check_task_status("paused") | |
138 | self.assertTrue(checked) | |
9f95a23c | 139 | |
11fdf7f2 TL |
140 | # resume and verify |
141 | self._resume_scrub(0) | |
39ae355f | 142 | self.assertTrue(self.fs.wait_until_scrub_complete(sleep=5, timeout=30)) |
7c673cae | 143 | |
f91f0fd5 TL |
144 | checked = self._check_task_status_na() |
145 | self.assertTrue(checked) | |
9f95a23c | 146 | |
1911f103 | 147 | def test_scrub_task_status_on_mds_failover(self): |
1911f103 TL |
148 | (original_active, ) = self.fs.get_active_names() |
149 | original_standbys = self.mds_cluster.get_standby_daemons() | |
f91f0fd5 TL |
150 | |
151 | test_dir = "scrub_control_test_path" | |
152 | abs_test_path = "/{0}".format(test_dir) | |
153 | ||
154 | self.create_scrub_data(test_dir) | |
155 | ||
f67539c2 | 156 | out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"]) |
f91f0fd5 TL |
157 | self.assertNotEqual(out_json, None) |
158 | ||
159 | # pause and verify | |
160 | self._pause_scrub(0) | |
f67539c2 | 161 | out_json = self.fs.get_scrub_status() |
f91f0fd5 TL |
162 | self.assertTrue("PAUSED" in out_json['status']) |
163 | ||
164 | checked = self._check_task_status("paused") | |
165 | self.assertTrue(checked) | |
1911f103 TL |
166 | |
167 | # Kill the rank 0 | |
168 | self.fs.mds_stop(original_active) | |
169 | ||
1911f103 TL |
170 | def promoted(): |
171 | active = self.fs.get_active_names() | |
172 | return active and active[0] in original_standbys | |
173 | ||
174 | log.info("Waiting for promotion of one of the original standbys {0}".format( | |
175 | original_standbys)) | |
20effc67 | 176 | self.wait_until_true(promoted, timeout=self.fs.beacon_timeout) |
1911f103 | 177 | |
f91f0fd5 | 178 | self._check_task_status_na() |
1911f103 | 179 | |
7c673cae FG |
180 | class TestScrubChecks(CephFSTestCase): |
181 | """ | |
182 | Run flush and scrub commands on the specified files in the filesystem. This | |
183 | task will run through a sequence of operations, but it is not comprehensive | |
184 | on its own -- it doesn't manipulate the mds cache state to test on both | |
185 | in- and out-of-memory parts of the hierarchy. So it's designed to be run | |
186 | multiple times within a single test run, so that the test can manipulate | |
187 | memory state. | |
188 | ||
189 | Usage: | |
190 | mds_scrub_checks: | |
191 | mds_rank: 0 | |
192 | path: path/to/test/dir | |
193 | client: 0 | |
194 | run_seq: [0-9]+ | |
195 | ||
196 | Increment the run_seq on subsequent invocations within a single test run; | |
197 | it uses that value to generate unique folder and file names. | |
198 | """ | |
199 | ||
200 | MDSS_REQUIRED = 1 | |
201 | CLIENTS_REQUIRED = 1 | |
202 | ||
203 | def test_scrub_checks(self): | |
204 | self._checks(0) | |
205 | self._checks(1) | |
206 | ||
207 | def _checks(self, run_seq): | |
208 | mds_rank = 0 | |
209 | test_dir = "scrub_test_path" | |
210 | ||
211 | abs_test_path = "/{0}".format(test_dir) | |
212 | ||
213 | log.info("mountpoint: {0}".format(self.mount_a.mountpoint)) | |
214 | client_path = os.path.join(self.mount_a.mountpoint, test_dir) | |
215 | log.info("client_path: {0}".format(client_path)) | |
216 | ||
217 | log.info("Cloning repo into place") | |
11fdf7f2 | 218 | repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path) |
7c673cae | 219 | |
9f95a23c TL |
220 | log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format( |
221 | id_=mds_rank, path=abs_test_path, seq=run_seq) | |
222 | ) | |
7c673cae FG |
223 | |
224 | ||
225 | success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0) | |
226 | ||
227 | nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path) | |
228 | self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep), | |
229 | lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) | |
11fdf7f2 | 230 | self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep), |
7c673cae FG |
231 | lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT)) |
232 | ||
233 | test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path) | |
234 | dirpath = "{repo_path}/suites".format(repo_path=test_repo_path) | |
235 | ||
236 | if run_seq == 0: | |
237 | log.info("First run: flushing {dirpath}".format(dirpath=dirpath)) | |
238 | command = "flush_path {dirpath}".format(dirpath=dirpath) | |
239 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
240 | command = "scrub start {dirpath}".format(dirpath=dirpath) |
241 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae FG |
242 | |
243 | filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format( | |
244 | repo_path=test_repo_path) | |
245 | if run_seq == 0: | |
246 | log.info("First run: flushing {filepath}".format(filepath=filepath)) | |
247 | command = "flush_path {filepath}".format(filepath=filepath) | |
248 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
249 | command = "scrub start {filepath}".format(filepath=filepath) |
250 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae | 251 | |
7c673cae FG |
252 | if run_seq == 0: |
253 | log.info("First run: flushing base dir /") | |
254 | command = "flush_path /" | |
255 | self.asok_command(mds_rank, command, success_validator) | |
11fdf7f2 TL |
256 | command = "scrub start /" |
257 | self.tell_command(mds_rank, command, success_validator) | |
7c673cae FG |
258 | |
259 | new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq) | |
260 | test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path, | |
261 | i=run_seq) | |
262 | self.mount_a.run_shell(["mkdir", new_dir]) | |
263 | command = "flush_path {dir}".format(dir=test_new_dir) | |
264 | self.asok_command(mds_rank, command, success_validator) | |
265 | ||
266 | new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path, | |
267 | i=run_seq) | |
268 | test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path, | |
269 | i=run_seq) | |
270 | self.mount_a.write_n_mb(new_file, 1) | |
271 | ||
272 | command = "flush_path {file}".format(file=test_new_file) | |
273 | self.asok_command(mds_rank, command, success_validator) | |
274 | ||
275 | # check that scrub fails on errors | |
276 | ino = self.mount_a.path_to_ino(new_file) | |
277 | rados_obj_name = "{ino:x}.00000000".format(ino=ino) | |
11fdf7f2 | 278 | command = "scrub start {file}".format(file=test_new_file) |
7c673cae | 279 | |
f67539c2 TL |
280 | def _check_and_clear_damage(ino, dtype): |
281 | all_damage = self.fs.rank_tell(["damage", "ls"], mds_rank) | |
282 | damage = [d for d in all_damage if d['ino'] == ino and d['damage_type'] == dtype] | |
283 | for d in damage: | |
284 | self.fs.mon_manager.raw_cluster_cmd( | |
285 | 'tell', 'mds.{0}'.format(self.fs.get_active_names()[mds_rank]), | |
286 | "damage", "rm", str(d['id'])) | |
287 | return len(damage) > 0 | |
288 | ||
289 | # Missing parent xattr | |
290 | self.assertFalse(_check_and_clear_damage(ino, "backtrace")); | |
7c673cae | 291 | self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name()) |
f67539c2 TL |
292 | self.tell_command(mds_rank, command, success_validator) |
293 | self.fs.wait_until_scrub_complete(sleep=5, timeout=30) | |
294 | self.assertTrue(_check_and_clear_damage(ino, "backtrace")); | |
7c673cae FG |
295 | |
296 | command = "flush_path /" | |
297 | self.asok_command(mds_rank, command, success_validator) | |
298 | ||
299 | def test_scrub_repair(self): | |
300 | mds_rank = 0 | |
301 | test_dir = "scrub_repair_path" | |
302 | ||
522d829b TL |
303 | self.mount_a.run_shell(["mkdir", test_dir]) |
304 | self.mount_a.run_shell(["touch", "{0}/file".format(test_dir)]) | |
7c673cae FG |
305 | dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir)) |
306 | ||
307 | self.mount_a.umount_wait() | |
308 | ||
309 | # flush journal entries to dirfrag objects, and expire journal | |
310 | self.fs.mds_asok(['flush', 'journal']) | |
311 | self.fs.mds_stop() | |
312 | ||
313 | # remove the dentry from dirfrag, cause incorrect fragstat/rstat | |
f67539c2 | 314 | self.fs.radosm(["rmomapkey", dir_objname, "file_head"]) |
7c673cae FG |
315 | |
316 | self.fs.mds_fail_restart() | |
317 | self.fs.wait_for_daemons() | |
318 | ||
e306af50 | 319 | self.mount_a.mount_wait() |
7c673cae FG |
320 | |
321 | # fragstat indicates the directory is not empty, rmdir should fail | |
322 | with self.assertRaises(CommandFailedError) as ar: | |
522d829b | 323 | self.mount_a.run_shell(["rmdir", test_dir]) |
7c673cae FG |
324 | self.assertEqual(ar.exception.exitstatus, 1) |
325 | ||
11fdf7f2 | 326 | self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir), |
7c673cae FG |
327 | lambda j, r: self.json_validator(j, r, "return_code", 0)) |
328 | ||
9f95a23c TL |
329 | # wait a few second for background repair |
330 | time.sleep(10) | |
7c673cae | 331 | |
9f95a23c | 332 | # fragstat should be fixed |
522d829b | 333 | self.mount_a.run_shell(["rmdir", test_dir]) |
7c673cae FG |
334 | |
335 | @staticmethod | |
336 | def json_validator(json_out, rc, element, expected_value): | |
337 | if rc != 0: | |
338 | return False, "asok command returned error {rc}".format(rc=rc) | |
339 | element_value = json_out.get(element) | |
340 | if element_value != expected_value: | |
341 | return False, "unexpectedly got {jv} instead of {ev}!".format( | |
342 | jv=element_value, ev=expected_value) | |
343 | return True, "Succeeded" | |
344 | ||
11fdf7f2 TL |
345 | def tell_command(self, mds_rank, command, validator): |
346 | log.info("Running command '{command}'".format(command=command)) | |
347 | ||
348 | command_list = command.split() | |
349 | jout = self.fs.rank_tell(command_list, mds_rank) | |
350 | ||
351 | log.info("command '{command}' returned '{jout}'".format( | |
352 | command=command, jout=jout)) | |
353 | ||
354 | success, errstring = validator(jout, 0) | |
355 | if not success: | |
9f95a23c | 356 | raise AsokCommandFailedError(command, 0, jout, errstring) |
11fdf7f2 TL |
357 | return jout |
358 | ||
7c673cae FG |
359 | def asok_command(self, mds_rank, command, validator): |
360 | log.info("Running command '{command}'".format(command=command)) | |
361 | ||
362 | command_list = command.split() | |
363 | ||
364 | # we just assume there's an active mds for every rank | |
365 | mds_id = self.fs.get_active_names()[mds_rank] | |
366 | proc = self.fs.mon_manager.admin_socket('mds', mds_id, | |
367 | command_list, check_status=False) | |
368 | rout = proc.exitstatus | |
369 | sout = proc.stdout.getvalue() | |
370 | ||
371 | if sout.strip(): | |
372 | jout = json.loads(sout) | |
373 | else: | |
374 | jout = None | |
375 | ||
9f95a23c TL |
376 | log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format( |
377 | command=command, rout=rout, sout=sout)) | |
7c673cae FG |
378 | |
379 | success, errstring = validator(jout, rout) | |
380 | ||
381 | if not success: | |
382 | raise AsokCommandFailedError(command, rout, jout, errstring) | |
383 | ||
384 | return jout | |
385 | ||
11fdf7f2 TL |
386 | @staticmethod |
387 | def clone_repo(client_mount, path): | |
7c673cae FG |
388 | repo = "ceph-qa-suite" |
389 | repo_path = os.path.join(path, repo) | |
390 | client_mount.run_shell(["mkdir", "-p", path]) | |
391 | ||
392 | try: | |
393 | client_mount.stat(repo_path) | |
394 | except CommandFailedError: | |
395 | client_mount.run_shell([ | |
396 | "git", "clone", '--branch', 'giant', | |
397 | "http://github.com/ceph/{repo}".format(repo=repo), | |
398 | "{path}/{repo}".format(path=path, repo=repo) | |
399 | ]) | |
400 | ||
401 | return repo_path | |
402 | ||
403 | ||
404 | class AsokCommandFailedError(Exception): | |
405 | """ | |
406 | Exception thrown when we get an unexpected response | |
407 | on an admin socket command | |
408 | """ | |
409 | ||
410 | def __init__(self, command, rc, json_out, errstring): | |
411 | self.command = command | |
412 | self.rc = rc | |
413 | self.json = json_out | |
414 | self.errstring = errstring | |
415 | ||
416 | def __str__(self): | |
9f95a23c TL |
417 | return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format( |
418 | command=self.command, rc=self.rc, json=self.json, es=self.errstring) |