]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
2 MDS admin socket scrubbing-related tests.
8 from teuthology
.exceptions
import CommandFailedError
9 from teuthology
.contextutil
import safe_while
11 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
13 log
= logging
.getLogger(__name__
)
15 class TestScrubControls(CephFSTestCase
):
17 Test basic scrub control operations such as abort, pause and resume.
23 def _abort_scrub(self
, expected
):
24 res
= self
.fs
.run_scrub(["abort"])
25 self
.assertEqual(res
['return_code'], expected
)
26 def _pause_scrub(self
, expected
):
27 res
= self
.fs
.run_scrub(["pause"])
28 self
.assertEqual(res
['return_code'], expected
)
29 def _resume_scrub(self
, expected
):
30 res
= self
.fs
.run_scrub(["resume"])
31 self
.assertEqual(res
['return_code'], expected
)
32 def _check_task_status(self
, expected_status
, timo
=120):
33 """ check scrub status for current active mds in ceph status """
34 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
36 active
= self
.fs
.get_active_names()
37 log
.debug("current active={0}".format(active
))
38 task_status
= self
.fs
.get_task_status("scrub status")
40 if task_status
[active
[0]].startswith(expected_status
):
45 def _check_task_status_na(self
, timo
=120):
46 """ check absence of scrub status in ceph status """
47 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
49 active
= self
.fs
.get_active_names()
50 log
.debug("current active={0}".format(active
))
51 task_status
= self
.fs
.get_task_status("scrub status")
52 if not active
[0] in task_status
:
55 def create_scrub_data(self
, test_dir
):
57 dirname
= "dir.{0}".format(i
)
58 dirpath
= os
.path
.join(test_dir
, dirname
)
59 self
.mount_a
.run_shell_payload(f
"""
62 for ((i = 0; i < 32; i++)); do
63 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
67 def test_scrub_abort(self
):
68 test_dir
= "scrub_control_test_path"
69 abs_test_path
= "/{0}".format(test_dir
)
71 self
.create_scrub_data(test_dir
)
73 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
74 self
.assertNotEqual(out_json
, None)
78 self
.fs
.wait_until_scrub_complete(sleep
=5, timeout
=30)
80 # sleep enough to fetch updated task status
81 checked
= self
._check
_task
_status
_na
()
82 self
.assertTrue(checked
)
84 def test_scrub_pause_and_resume(self
):
85 test_dir
= "scrub_control_test_path"
86 abs_test_path
= "/{0}".format(test_dir
)
88 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
89 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
90 log
.info("client_path: {0}".format(client_path
))
92 self
.create_scrub_data(test_dir
)
94 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
95 self
.assertNotEqual(out_json
, None)
99 out_json
= self
.fs
.get_scrub_status()
100 self
.assertTrue("PAUSED" in out_json
['status'])
102 checked
= self
._check
_task
_status
("paused")
103 self
.assertTrue(checked
)
106 self
._resume
_scrub
(0)
107 out_json
= self
.fs
.get_scrub_status()
108 self
.assertFalse("PAUSED" in out_json
['status'])
110 checked
= self
._check
_task
_status
_na
()
111 self
.assertTrue(checked
)
113 def test_scrub_pause_and_resume_with_abort(self
):
114 test_dir
= "scrub_control_test_path"
115 abs_test_path
= "/{0}".format(test_dir
)
117 self
.create_scrub_data(test_dir
)
119 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
120 self
.assertNotEqual(out_json
, None)
124 out_json
= self
.fs
.get_scrub_status()
125 self
.assertTrue("PAUSED" in out_json
['status'])
127 checked
= self
._check
_task
_status
("paused")
128 self
.assertTrue(checked
)
132 out_json
= self
.fs
.get_scrub_status()
133 self
.assertTrue("PAUSED" in out_json
['status'])
134 self
.assertTrue("0 inodes" in out_json
['status'])
136 # scrub status should still be paused...
137 checked
= self
._check
_task
_status
("paused")
138 self
.assertTrue(checked
)
141 self
._resume
_scrub
(0)
142 out_json
= self
.fs
.get_scrub_status()
143 self
.assertTrue("no active" in out_json
['status'])
145 checked
= self
._check
_task
_status
_na
()
146 self
.assertTrue(checked
)
148 def test_scrub_task_status_on_mds_failover(self
):
149 (original_active
, ) = self
.fs
.get_active_names()
150 original_standbys
= self
.mds_cluster
.get_standby_daemons()
152 test_dir
= "scrub_control_test_path"
153 abs_test_path
= "/{0}".format(test_dir
)
155 self
.create_scrub_data(test_dir
)
157 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
158 self
.assertNotEqual(out_json
, None)
162 out_json
= self
.fs
.get_scrub_status()
163 self
.assertTrue("PAUSED" in out_json
['status'])
165 checked
= self
._check
_task
_status
("paused")
166 self
.assertTrue(checked
)
169 self
.fs
.mds_stop(original_active
)
172 active
= self
.fs
.get_active_names()
173 return active
and active
[0] in original_standbys
175 log
.info("Waiting for promotion of one of the original standbys {0}".format(
177 self
.wait_until_true(promoted
, timeout
=self
.fs
.beacon_timeout
)
179 self
._check
_task
_status
_na
()
181 class TestScrubChecks(CephFSTestCase
):
183 Run flush and scrub commands on the specified files in the filesystem. This
184 task will run through a sequence of operations, but it is not comprehensive
185 on its own -- it doesn't manipulate the mds cache state to test on both
186 in- and out-of-memory parts of the hierarchy. So it's designed to be run
187 multiple times within a single test run, so that the test can manipulate
193 path: path/to/test/dir
197 Increment the run_seq on subsequent invocations within a single test run;
198 it uses that value to generate unique folder and file names.
204 def test_scrub_checks(self
):
208 def _checks(self
, run_seq
):
210 test_dir
= "scrub_test_path"
212 abs_test_path
= "/{0}".format(test_dir
)
214 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
215 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
216 log
.info("client_path: {0}".format(client_path
))
218 log
.info("Cloning repo into place")
219 repo_path
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
221 log
.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
222 id_
=mds_rank
, path
=abs_test_path
, seq
=run_seq
)
226 success_validator
= lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0)
228 nep
= "{test_path}/i/dont/exist".format(test_path
=abs_test_path
)
229 self
.asok_command(mds_rank
, "flush_path {nep}".format(nep
=nep
),
230 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
231 self
.tell_command(mds_rank
, "scrub start {nep}".format(nep
=nep
),
232 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
234 test_repo_path
= "{test_path}/ceph-qa-suite".format(test_path
=abs_test_path
)
235 dirpath
= "{repo_path}/suites".format(repo_path
=test_repo_path
)
238 log
.info("First run: flushing {dirpath}".format(dirpath
=dirpath
))
239 command
= "flush_path {dirpath}".format(dirpath
=dirpath
)
240 self
.asok_command(mds_rank
, command
, success_validator
)
241 command
= "scrub start {dirpath}".format(dirpath
=dirpath
)
242 self
.tell_command(mds_rank
, command
, success_validator
)
244 filepath
= "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
245 repo_path
=test_repo_path
)
247 log
.info("First run: flushing {filepath}".format(filepath
=filepath
))
248 command
= "flush_path {filepath}".format(filepath
=filepath
)
249 self
.asok_command(mds_rank
, command
, success_validator
)
250 command
= "scrub start {filepath}".format(filepath
=filepath
)
251 self
.tell_command(mds_rank
, command
, success_validator
)
254 log
.info("First run: flushing base dir /")
255 command
= "flush_path /"
256 self
.asok_command(mds_rank
, command
, success_validator
)
257 command
= "scrub start /"
258 self
.tell_command(mds_rank
, command
, success_validator
)
260 new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=repo_path
, i
=run_seq
)
261 test_new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=test_repo_path
,
263 self
.mount_a
.run_shell(["mkdir", new_dir
])
264 command
= "flush_path {dir}".format(dir=test_new_dir
)
265 self
.asok_command(mds_rank
, command
, success_validator
)
267 new_file
= "{repo_path}/new_file_{i}".format(repo_path
=repo_path
,
269 test_new_file
= "{repo_path}/new_file_{i}".format(repo_path
=test_repo_path
,
271 self
.mount_a
.write_n_mb(new_file
, 1)
273 command
= "flush_path {file}".format(file=test_new_file
)
274 self
.asok_command(mds_rank
, command
, success_validator
)
276 # check that scrub fails on errors
277 ino
= self
.mount_a
.path_to_ino(new_file
)
278 rados_obj_name
= "{ino:x}.00000000".format(ino
=ino
)
279 command
= "scrub start {file}".format(file=test_new_file
)
281 def _check_and_clear_damage(ino
, dtype
):
282 all_damage
= self
.fs
.rank_tell(["damage", "ls"], mds_rank
)
283 damage
= [d
for d
in all_damage
if d
['ino'] == ino
and d
['damage_type'] == dtype
]
285 self
.fs
.mon_manager
.raw_cluster_cmd(
286 'tell', 'mds.{0}'.format(self
.fs
.get_active_names()[mds_rank
]),
287 "damage", "rm", str(d
['id']))
288 return len(damage
) > 0
290 # Missing parent xattr
291 self
.assertFalse(_check_and_clear_damage(ino
, "backtrace"));
292 self
.fs
.rados(["rmxattr", rados_obj_name
, "parent"], pool
=self
.fs
.get_data_pool_name())
293 self
.tell_command(mds_rank
, command
, success_validator
)
294 self
.fs
.wait_until_scrub_complete(sleep
=5, timeout
=30)
295 self
.assertTrue(_check_and_clear_damage(ino
, "backtrace"));
297 command
= "flush_path /"
298 self
.asok_command(mds_rank
, command
, success_validator
)
300 def test_scrub_repair(self
):
302 test_dir
= "scrub_repair_path"
304 self
.mount_a
.run_shell(["mkdir", test_dir
])
305 self
.mount_a
.run_shell(["touch", "{0}/file".format(test_dir
)])
306 dir_objname
= "{:x}.00000000".format(self
.mount_a
.path_to_ino(test_dir
))
308 self
.mount_a
.umount_wait()
310 # flush journal entries to dirfrag objects, and expire journal
311 self
.fs
.mds_asok(['flush', 'journal'])
314 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
315 self
.fs
.radosm(["rmomapkey", dir_objname
, "file_head"])
317 self
.fs
.mds_fail_restart()
318 self
.fs
.wait_for_daemons()
320 self
.mount_a
.mount_wait()
322 # fragstat indicates the directory is not empty, rmdir should fail
323 with self
.assertRaises(CommandFailedError
) as ar
:
324 self
.mount_a
.run_shell(["rmdir", test_dir
])
325 self
.assertEqual(ar
.exception
.exitstatus
, 1)
327 self
.tell_command(mds_rank
, "scrub start /{0} repair".format(test_dir
),
328 lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0))
330 # wait a few second for background repair
333 # fragstat should be fixed
334 self
.mount_a
.run_shell(["rmdir", test_dir
])
337 def json_validator(json_out
, rc
, element
, expected_value
):
339 return False, "asok command returned error {rc}".format(rc
=rc
)
340 element_value
= json_out
.get(element
)
341 if element_value
!= expected_value
:
342 return False, "unexpectedly got {jv} instead of {ev}!".format(
343 jv
=element_value
, ev
=expected_value
)
344 return True, "Succeeded"
346 def tell_command(self
, mds_rank
, command
, validator
):
347 log
.info("Running command '{command}'".format(command
=command
))
349 command_list
= command
.split()
350 jout
= self
.fs
.rank_tell(command_list
, mds_rank
)
352 log
.info("command '{command}' returned '{jout}'".format(
353 command
=command
, jout
=jout
))
355 success
, errstring
= validator(jout
, 0)
357 raise AsokCommandFailedError(command
, 0, jout
, errstring
)
360 def asok_command(self
, mds_rank
, command
, validator
):
361 log
.info("Running command '{command}'".format(command
=command
))
363 command_list
= command
.split()
365 # we just assume there's an active mds for every rank
366 mds_id
= self
.fs
.get_active_names()[mds_rank
]
367 proc
= self
.fs
.mon_manager
.admin_socket('mds', mds_id
,
368 command_list
, check_status
=False)
369 rout
= proc
.exitstatus
370 sout
= proc
.stdout
.getvalue()
373 jout
= json
.loads(sout
)
377 log
.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
378 command
=command
, rout
=rout
, sout
=sout
))
380 success
, errstring
= validator(jout
, rout
)
383 raise AsokCommandFailedError(command
, rout
, jout
, errstring
)
388 def clone_repo(client_mount
, path
):
389 repo
= "ceph-qa-suite"
390 repo_path
= os
.path
.join(path
, repo
)
391 client_mount
.run_shell(["mkdir", "-p", path
])
394 client_mount
.stat(repo_path
)
395 except CommandFailedError
:
396 client_mount
.run_shell([
397 "git", "clone", '--branch', 'giant',
398 "http://github.com/ceph/{repo}".format(repo
=repo
),
399 "{path}/{repo}".format(path
=path
, repo
=repo
)
405 class AsokCommandFailedError(Exception):
407 Exception thrown when we get an unexpected response
408 on an admin socket command
411 def __init__(self
, command
, rc
, json_out
, errstring
):
412 self
.command
= command
415 self
.errstring
= errstring
418 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
419 command
=self
.command
, rc
=self
.rc
, json
=self
.json
, es
=self
.errstring
)