]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
2 MDS admin socket scrubbing-related tests.
8 from teuthology
.exceptions
import CommandFailedError
9 from teuthology
.contextutil
import safe_while
11 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
13 log
= logging
.getLogger(__name__
)
15 class TestScrubControls(CephFSTestCase
):
17 Test basic scrub control operations such as abort, pause and resume.
23 def _abort_scrub(self
, expected
):
24 res
= self
.fs
.run_scrub(["abort"])
25 self
.assertEqual(res
['return_code'], expected
)
26 def _pause_scrub(self
, expected
):
27 res
= self
.fs
.run_scrub(["pause"])
28 self
.assertEqual(res
['return_code'], expected
)
29 def _resume_scrub(self
, expected
):
30 res
= self
.fs
.run_scrub(["resume"])
31 self
.assertEqual(res
['return_code'], expected
)
32 def _check_task_status(self
, expected_status
, timo
=120):
33 """ check scrub status for current active mds in ceph status """
34 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
36 active
= self
.fs
.get_active_names()
37 log
.debug("current active={0}".format(active
))
38 task_status
= self
.fs
.get_task_status("scrub status")
40 if task_status
[active
[0]].startswith(expected_status
):
45 def _check_task_status_na(self
, timo
=120):
46 """ check absence of scrub status in ceph status """
47 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
49 active
= self
.fs
.get_active_names()
50 log
.debug("current active={0}".format(active
))
51 task_status
= self
.fs
.get_task_status("scrub status")
52 if not active
[0] in task_status
:
55 def create_scrub_data(self
, test_dir
):
57 dirname
= "dir.{0}".format(i
)
58 dirpath
= os
.path
.join(test_dir
, dirname
)
59 self
.mount_a
.run_shell_payload(f
"""
62 for ((i = 0; i < 32; i++)); do
63 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
67 def test_scrub_abort(self
):
68 test_dir
= "scrub_control_test_path"
69 abs_test_path
= "/{0}".format(test_dir
)
71 self
.create_scrub_data(test_dir
)
73 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
74 self
.assertNotEqual(out_json
, None)
78 self
.fs
.wait_until_scrub_complete(sleep
=5, timeout
=30)
80 # sleep enough to fetch updated task status
81 checked
= self
._check
_task
_status
_na
()
82 self
.assertTrue(checked
)
84 def test_scrub_pause_and_resume(self
):
85 test_dir
= "scrub_control_test_path"
86 abs_test_path
= "/{0}".format(test_dir
)
88 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
89 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
90 log
.info("client_path: {0}".format(client_path
))
92 self
.create_scrub_data(test_dir
)
94 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
95 self
.assertNotEqual(out_json
, None)
99 out_json
= self
.fs
.get_scrub_status()
100 self
.assertTrue("PAUSED" in out_json
['status'])
102 checked
= self
._check
_task
_status
("paused")
103 self
.assertTrue(checked
)
106 self
._resume
_scrub
(0)
107 out_json
= self
.fs
.get_scrub_status()
108 self
.assertFalse("PAUSED" in out_json
['status'])
110 checked
= self
._check
_task
_status
_na
()
111 self
.assertTrue(checked
)
113 def test_scrub_pause_and_resume_with_abort(self
):
114 test_dir
= "scrub_control_test_path"
115 abs_test_path
= "/{0}".format(test_dir
)
117 self
.create_scrub_data(test_dir
)
119 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
120 self
.assertNotEqual(out_json
, None)
124 out_json
= self
.fs
.get_scrub_status()
125 self
.assertTrue("PAUSED" in out_json
['status'])
127 checked
= self
._check
_task
_status
("paused")
128 self
.assertTrue(checked
)
132 out_json
= self
.fs
.get_scrub_status()
133 self
.assertTrue("PAUSED" in out_json
['status'])
134 self
.assertTrue("0 inodes" in out_json
['status'])
136 # scrub status should still be paused...
137 checked
= self
._check
_task
_status
("paused")
138 self
.assertTrue(checked
)
141 self
._resume
_scrub
(0)
142 self
.assertTrue(self
.fs
.wait_until_scrub_complete(sleep
=5, timeout
=30))
144 checked
= self
._check
_task
_status
_na
()
145 self
.assertTrue(checked
)
147 def test_scrub_task_status_on_mds_failover(self
):
148 (original_active
, ) = self
.fs
.get_active_names()
149 original_standbys
= self
.mds_cluster
.get_standby_daemons()
151 test_dir
= "scrub_control_test_path"
152 abs_test_path
= "/{0}".format(test_dir
)
154 self
.create_scrub_data(test_dir
)
156 out_json
= self
.fs
.run_scrub(["start", abs_test_path
, "recursive"])
157 self
.assertNotEqual(out_json
, None)
161 out_json
= self
.fs
.get_scrub_status()
162 self
.assertTrue("PAUSED" in out_json
['status'])
164 checked
= self
._check
_task
_status
("paused")
165 self
.assertTrue(checked
)
168 self
.fs
.mds_stop(original_active
)
171 active
= self
.fs
.get_active_names()
172 return active
and active
[0] in original_standbys
174 log
.info("Waiting for promotion of one of the original standbys {0}".format(
176 self
.wait_until_true(promoted
, timeout
=self
.fs
.beacon_timeout
)
178 self
._check
_task
_status
_na
()
180 class TestScrubChecks(CephFSTestCase
):
182 Run flush and scrub commands on the specified files in the filesystem. This
183 task will run through a sequence of operations, but it is not comprehensive
184 on its own -- it doesn't manipulate the mds cache state to test on both
185 in- and out-of-memory parts of the hierarchy. So it's designed to be run
186 multiple times within a single test run, so that the test can manipulate
192 path: path/to/test/dir
196 Increment the run_seq on subsequent invocations within a single test run;
197 it uses that value to generate unique folder and file names.
203 def test_scrub_checks(self
):
207 def _checks(self
, run_seq
):
209 test_dir
= "scrub_test_path"
211 abs_test_path
= "/{0}".format(test_dir
)
213 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
214 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
215 log
.info("client_path: {0}".format(client_path
))
217 log
.info("Cloning repo into place")
218 repo_path
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
220 log
.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
221 id_
=mds_rank
, path
=abs_test_path
, seq
=run_seq
)
225 success_validator
= lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0)
227 nep
= "{test_path}/i/dont/exist".format(test_path
=abs_test_path
)
228 self
.asok_command(mds_rank
, "flush_path {nep}".format(nep
=nep
),
229 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
230 self
.tell_command(mds_rank
, "scrub start {nep}".format(nep
=nep
),
231 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
233 test_repo_path
= "{test_path}/ceph-qa-suite".format(test_path
=abs_test_path
)
234 dirpath
= "{repo_path}/suites".format(repo_path
=test_repo_path
)
237 log
.info("First run: flushing {dirpath}".format(dirpath
=dirpath
))
238 command
= "flush_path {dirpath}".format(dirpath
=dirpath
)
239 self
.asok_command(mds_rank
, command
, success_validator
)
240 command
= "scrub start {dirpath}".format(dirpath
=dirpath
)
241 self
.tell_command(mds_rank
, command
, success_validator
)
243 filepath
= "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
244 repo_path
=test_repo_path
)
246 log
.info("First run: flushing {filepath}".format(filepath
=filepath
))
247 command
= "flush_path {filepath}".format(filepath
=filepath
)
248 self
.asok_command(mds_rank
, command
, success_validator
)
249 command
= "scrub start {filepath}".format(filepath
=filepath
)
250 self
.tell_command(mds_rank
, command
, success_validator
)
253 log
.info("First run: flushing base dir /")
254 command
= "flush_path /"
255 self
.asok_command(mds_rank
, command
, success_validator
)
256 command
= "scrub start /"
257 self
.tell_command(mds_rank
, command
, success_validator
)
259 new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=repo_path
, i
=run_seq
)
260 test_new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=test_repo_path
,
262 self
.mount_a
.run_shell(["mkdir", new_dir
])
263 command
= "flush_path {dir}".format(dir=test_new_dir
)
264 self
.asok_command(mds_rank
, command
, success_validator
)
266 new_file
= "{repo_path}/new_file_{i}".format(repo_path
=repo_path
,
268 test_new_file
= "{repo_path}/new_file_{i}".format(repo_path
=test_repo_path
,
270 self
.mount_a
.write_n_mb(new_file
, 1)
272 command
= "flush_path {file}".format(file=test_new_file
)
273 self
.asok_command(mds_rank
, command
, success_validator
)
275 # check that scrub fails on errors
276 ino
= self
.mount_a
.path_to_ino(new_file
)
277 rados_obj_name
= "{ino:x}.00000000".format(ino
=ino
)
278 command
= "scrub start {file}".format(file=test_new_file
)
280 def _check_and_clear_damage(ino
, dtype
):
281 all_damage
= self
.fs
.rank_tell(["damage", "ls"], mds_rank
)
282 damage
= [d
for d
in all_damage
if d
['ino'] == ino
and d
['damage_type'] == dtype
]
284 self
.fs
.mon_manager
.raw_cluster_cmd(
285 'tell', 'mds.{0}'.format(self
.fs
.get_active_names()[mds_rank
]),
286 "damage", "rm", str(d
['id']))
287 return len(damage
) > 0
289 # Missing parent xattr
290 self
.assertFalse(_check_and_clear_damage(ino
, "backtrace"));
291 self
.fs
.rados(["rmxattr", rados_obj_name
, "parent"], pool
=self
.fs
.get_data_pool_name())
292 self
.tell_command(mds_rank
, command
, success_validator
)
293 self
.fs
.wait_until_scrub_complete(sleep
=5, timeout
=30)
294 self
.assertTrue(_check_and_clear_damage(ino
, "backtrace"));
296 command
= "flush_path /"
297 self
.asok_command(mds_rank
, command
, success_validator
)
299 def test_scrub_repair(self
):
301 test_dir
= "scrub_repair_path"
303 self
.mount_a
.run_shell(["mkdir", test_dir
])
304 self
.mount_a
.run_shell(["touch", "{0}/file".format(test_dir
)])
305 dir_objname
= "{:x}.00000000".format(self
.mount_a
.path_to_ino(test_dir
))
307 self
.mount_a
.umount_wait()
309 # flush journal entries to dirfrag objects, and expire journal
310 self
.fs
.mds_asok(['flush', 'journal'])
313 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
314 self
.fs
.radosm(["rmomapkey", dir_objname
, "file_head"])
316 self
.fs
.mds_fail_restart()
317 self
.fs
.wait_for_daemons()
319 self
.mount_a
.mount_wait()
321 # fragstat indicates the directory is not empty, rmdir should fail
322 with self
.assertRaises(CommandFailedError
) as ar
:
323 self
.mount_a
.run_shell(["rmdir", test_dir
])
324 self
.assertEqual(ar
.exception
.exitstatus
, 1)
326 self
.tell_command(mds_rank
, "scrub start /{0} repair".format(test_dir
),
327 lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0))
329 # wait a few second for background repair
332 # fragstat should be fixed
333 self
.mount_a
.run_shell(["rmdir", test_dir
])
336 def json_validator(json_out
, rc
, element
, expected_value
):
338 return False, "asok command returned error {rc}".format(rc
=rc
)
339 element_value
= json_out
.get(element
)
340 if element_value
!= expected_value
:
341 return False, "unexpectedly got {jv} instead of {ev}!".format(
342 jv
=element_value
, ev
=expected_value
)
343 return True, "Succeeded"
345 def tell_command(self
, mds_rank
, command
, validator
):
346 log
.info("Running command '{command}'".format(command
=command
))
348 command_list
= command
.split()
349 jout
= self
.fs
.rank_tell(command_list
, mds_rank
)
351 log
.info("command '{command}' returned '{jout}'".format(
352 command
=command
, jout
=jout
))
354 success
, errstring
= validator(jout
, 0)
356 raise AsokCommandFailedError(command
, 0, jout
, errstring
)
359 def asok_command(self
, mds_rank
, command
, validator
):
360 log
.info("Running command '{command}'".format(command
=command
))
362 command_list
= command
.split()
364 # we just assume there's an active mds for every rank
365 mds_id
= self
.fs
.get_active_names()[mds_rank
]
366 proc
= self
.fs
.mon_manager
.admin_socket('mds', mds_id
,
367 command_list
, check_status
=False)
368 rout
= proc
.exitstatus
369 sout
= proc
.stdout
.getvalue()
372 jout
= json
.loads(sout
)
376 log
.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
377 command
=command
, rout
=rout
, sout
=sout
))
379 success
, errstring
= validator(jout
, rout
)
382 raise AsokCommandFailedError(command
, rout
, jout
, errstring
)
387 def clone_repo(client_mount
, path
):
388 repo
= "ceph-qa-suite"
389 repo_path
= os
.path
.join(path
, repo
)
390 client_mount
.run_shell(["mkdir", "-p", path
])
393 client_mount
.stat(repo_path
)
394 except CommandFailedError
:
395 client_mount
.run_shell([
396 "git", "clone", '--branch', 'giant',
397 "http://github.com/ceph/{repo}".format(repo
=repo
),
398 "{path}/{repo}".format(path
=path
, repo
=repo
)
404 class AsokCommandFailedError(Exception):
406 Exception thrown when we get an unexpected response
407 on an admin socket command
410 def __init__(self
, command
, rc
, json_out
, errstring
):
411 self
.command
= command
414 self
.errstring
= errstring
417 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
418 command
=self
.command
, rc
=self
.rc
, json
=self
.json
, es
=self
.errstring
)