]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
d9c2b5a7c496145f8b02cb7a18a8d63654a045bf
2 MDS admin socket scrubbing-related tests.
8 from teuthology
.exceptions
import CommandFailedError
9 from teuthology
.contextutil
import safe_while
11 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
13 log
= logging
.getLogger(__name__
)
15 class TestScrubControls(CephFSTestCase
):
17 Test basic scrub control operations such as abort, pause and resume.
23 def _abort_scrub(self
, expected
):
24 res
= self
.fs
.rank_tell(["scrub", "abort"])
25 self
.assertEqual(res
['return_code'], expected
)
26 def _pause_scrub(self
, expected
):
27 res
= self
.fs
.rank_tell(["scrub", "pause"])
28 self
.assertEqual(res
['return_code'], expected
)
29 def _resume_scrub(self
, expected
):
30 res
= self
.fs
.rank_tell(["scrub", "resume"])
31 self
.assertEqual(res
['return_code'], expected
)
32 def _get_scrub_status(self
):
33 return self
.fs
.rank_tell(["scrub", "status"])
34 def _check_task_status(self
, expected_status
, timo
=120):
35 """ check scrub status for current active mds in ceph status """
36 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
38 active
= self
.fs
.get_active_names()
39 log
.debug("current active={0}".format(active
))
40 task_status
= self
.fs
.get_task_status("scrub status")
42 if task_status
[active
[0]].startswith(expected_status
):
47 def _check_task_status_na(self
, timo
=120):
48 """ check absence of scrub status in ceph status """
49 with
safe_while(sleep
=1, tries
=120, action
='wait for task status') as proceed
:
51 active
= self
.fs
.get_active_names()
52 log
.debug("current active={0}".format(active
))
53 task_status
= self
.fs
.get_task_status("scrub status")
54 if not active
[0] in task_status
:
57 def create_scrub_data(self
, test_dir
):
59 dirname
= "dir.{0}".format(i
)
60 dirpath
= os
.path
.join(test_dir
, dirname
)
61 self
.mount_a
.run_shell_payload(f
"""
64 for ((i = 0; i < 32; i++)); do
65 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
69 def test_scrub_abort(self
):
70 test_dir
= "scrub_control_test_path"
71 abs_test_path
= "/{0}".format(test_dir
)
73 self
.create_scrub_data(test_dir
)
75 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
76 self
.assertNotEqual(out_json
, None)
80 out_json
= self
._get
_scrub
_status
()
81 self
.assertTrue("no active" in out_json
['status'])
83 # sleep enough to fetch updated task status
84 checked
= self
._check
_task
_status
_na
()
85 self
.assertTrue(checked
)
87 def test_scrub_pause_and_resume(self
):
88 test_dir
= "scrub_control_test_path"
89 abs_test_path
= "/{0}".format(test_dir
)
91 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
92 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
93 log
.info("client_path: {0}".format(client_path
))
95 self
.create_scrub_data(test_dir
)
97 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
98 self
.assertNotEqual(out_json
, None)
102 out_json
= self
._get
_scrub
_status
()
103 self
.assertTrue("PAUSED" in out_json
['status'])
105 checked
= self
._check
_task
_status
("paused")
106 self
.assertTrue(checked
)
109 self
._resume
_scrub
(0)
110 out_json
= self
._get
_scrub
_status
()
111 self
.assertFalse("PAUSED" in out_json
['status'])
113 checked
= self
._check
_task
_status
_na
()
114 self
.assertTrue(checked
)
116 def test_scrub_pause_and_resume_with_abort(self
):
117 test_dir
= "scrub_control_test_path"
118 abs_test_path
= "/{0}".format(test_dir
)
120 self
.create_scrub_data(test_dir
)
122 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
123 self
.assertNotEqual(out_json
, None)
127 out_json
= self
._get
_scrub
_status
()
128 self
.assertTrue("PAUSED" in out_json
['status'])
130 checked
= self
._check
_task
_status
("paused")
131 self
.assertTrue(checked
)
135 out_json
= self
._get
_scrub
_status
()
136 self
.assertTrue("PAUSED" in out_json
['status'])
137 self
.assertTrue("0 inodes" in out_json
['status'])
139 # scrub status should still be paused...
140 checked
= self
._check
_task
_status
("paused")
141 self
.assertTrue(checked
)
144 self
._resume
_scrub
(0)
145 out_json
= self
._get
_scrub
_status
()
146 self
.assertTrue("no active" in out_json
['status'])
148 checked
= self
._check
_task
_status
_na
()
149 self
.assertTrue(checked
)
151 def test_scrub_task_status_on_mds_failover(self
):
152 (original_active
, ) = self
.fs
.get_active_names()
153 original_standbys
= self
.mds_cluster
.get_standby_daemons()
155 test_dir
= "scrub_control_test_path"
156 abs_test_path
= "/{0}".format(test_dir
)
158 self
.create_scrub_data(test_dir
)
160 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
161 self
.assertNotEqual(out_json
, None)
165 out_json
= self
._get
_scrub
_status
()
166 self
.assertTrue("PAUSED" in out_json
['status'])
168 checked
= self
._check
_task
_status
("paused")
169 self
.assertTrue(checked
)
172 self
.fs
.mds_stop(original_active
)
174 grace
= float(self
.fs
.get_config("mds_beacon_grace", service_type
="mon"))
177 active
= self
.fs
.get_active_names()
178 return active
and active
[0] in original_standbys
180 log
.info("Waiting for promotion of one of the original standbys {0}".format(
182 self
.wait_until_true(promoted
, timeout
=grace
*2)
184 self
._check
_task
_status
_na
()
186 class TestScrubChecks(CephFSTestCase
):
188 Run flush and scrub commands on the specified files in the filesystem. This
189 task will run through a sequence of operations, but it is not comprehensive
190 on its own -- it doesn't manipulate the mds cache state to test on both
191 in- and out-of-memory parts of the hierarchy. So it's designed to be run
192 multiple times within a single test run, so that the test can manipulate
198 path: path/to/test/dir
202 Increment the run_seq on subsequent invocations within a single test run;
203 it uses that value to generate unique folder and file names.
209 def test_scrub_checks(self
):
213 def _checks(self
, run_seq
):
215 test_dir
= "scrub_test_path"
217 abs_test_path
= "/{0}".format(test_dir
)
219 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
220 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
221 log
.info("client_path: {0}".format(client_path
))
223 log
.info("Cloning repo into place")
224 repo_path
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
226 log
.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
227 id_
=mds_rank
, path
=abs_test_path
, seq
=run_seq
)
231 success_validator
= lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0)
233 nep
= "{test_path}/i/dont/exist".format(test_path
=abs_test_path
)
234 self
.asok_command(mds_rank
, "flush_path {nep}".format(nep
=nep
),
235 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
236 self
.tell_command(mds_rank
, "scrub start {nep}".format(nep
=nep
),
237 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
239 test_repo_path
= "{test_path}/ceph-qa-suite".format(test_path
=abs_test_path
)
240 dirpath
= "{repo_path}/suites".format(repo_path
=test_repo_path
)
243 log
.info("First run: flushing {dirpath}".format(dirpath
=dirpath
))
244 command
= "flush_path {dirpath}".format(dirpath
=dirpath
)
245 self
.asok_command(mds_rank
, command
, success_validator
)
246 command
= "scrub start {dirpath}".format(dirpath
=dirpath
)
247 self
.tell_command(mds_rank
, command
, success_validator
)
249 filepath
= "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
250 repo_path
=test_repo_path
)
252 log
.info("First run: flushing {filepath}".format(filepath
=filepath
))
253 command
= "flush_path {filepath}".format(filepath
=filepath
)
254 self
.asok_command(mds_rank
, command
, success_validator
)
255 command
= "scrub start {filepath}".format(filepath
=filepath
)
256 self
.tell_command(mds_rank
, command
, success_validator
)
258 filepath
= "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
259 format(repo_path
=test_repo_path
)
260 command
= "scrub start {filepath}".format(filepath
=filepath
)
261 self
.tell_command(mds_rank
, command
,
262 lambda j
, r
: self
.json_validator(j
, r
, "performed_validation",
266 log
.info("First run: flushing base dir /")
267 command
= "flush_path /"
268 self
.asok_command(mds_rank
, command
, success_validator
)
269 command
= "scrub start /"
270 self
.tell_command(mds_rank
, command
, success_validator
)
272 new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=repo_path
, i
=run_seq
)
273 test_new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=test_repo_path
,
275 self
.mount_a
.run_shell(["mkdir", new_dir
])
276 command
= "flush_path {dir}".format(dir=test_new_dir
)
277 self
.asok_command(mds_rank
, command
, success_validator
)
279 new_file
= "{repo_path}/new_file_{i}".format(repo_path
=repo_path
,
281 test_new_file
= "{repo_path}/new_file_{i}".format(repo_path
=test_repo_path
,
283 self
.mount_a
.write_n_mb(new_file
, 1)
285 command
= "flush_path {file}".format(file=test_new_file
)
286 self
.asok_command(mds_rank
, command
, success_validator
)
288 # check that scrub fails on errors
289 ino
= self
.mount_a
.path_to_ino(new_file
)
290 rados_obj_name
= "{ino:x}.00000000".format(ino
=ino
)
291 command
= "scrub start {file}".format(file=test_new_file
)
293 # Missing parent xattr -> ENODATA
294 self
.fs
.rados(["rmxattr", rados_obj_name
, "parent"], pool
=self
.fs
.get_data_pool_name())
295 self
.tell_command(mds_rank
, command
,
296 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENODATA
))
298 # Missing object -> ENOENT
299 self
.fs
.rados(["rm", rados_obj_name
], pool
=self
.fs
.get_data_pool_name())
300 self
.tell_command(mds_rank
, command
,
301 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
303 command
= "flush_path /"
304 self
.asok_command(mds_rank
, command
, success_validator
)
306 def test_scrub_repair(self
):
308 test_dir
= "scrub_repair_path"
310 self
.mount_a
.run_shell(["sudo", "mkdir", test_dir
])
311 self
.mount_a
.run_shell(["sudo", "touch", "{0}/file".format(test_dir
)])
312 dir_objname
= "{:x}.00000000".format(self
.mount_a
.path_to_ino(test_dir
))
314 self
.mount_a
.umount_wait()
316 # flush journal entries to dirfrag objects, and expire journal
317 self
.fs
.mds_asok(['flush', 'journal'])
320 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
321 self
.fs
.rados(["rmomapkey", dir_objname
, "file_head"],
322 pool
=self
.fs
.get_metadata_pool_name())
324 self
.fs
.mds_fail_restart()
325 self
.fs
.wait_for_daemons()
327 self
.mount_a
.mount_wait()
329 # fragstat indicates the directory is not empty, rmdir should fail
330 with self
.assertRaises(CommandFailedError
) as ar
:
331 self
.mount_a
.run_shell(["sudo", "rmdir", test_dir
])
332 self
.assertEqual(ar
.exception
.exitstatus
, 1)
334 self
.tell_command(mds_rank
, "scrub start /{0} repair".format(test_dir
),
335 lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0))
337 # wait a few second for background repair
340 # fragstat should be fixed
341 self
.mount_a
.run_shell(["sudo", "rmdir", test_dir
])
344 def json_validator(json_out
, rc
, element
, expected_value
):
346 return False, "asok command returned error {rc}".format(rc
=rc
)
347 element_value
= json_out
.get(element
)
348 if element_value
!= expected_value
:
349 return False, "unexpectedly got {jv} instead of {ev}!".format(
350 jv
=element_value
, ev
=expected_value
)
351 return True, "Succeeded"
353 def tell_command(self
, mds_rank
, command
, validator
):
354 log
.info("Running command '{command}'".format(command
=command
))
356 command_list
= command
.split()
357 jout
= self
.fs
.rank_tell(command_list
, mds_rank
)
359 log
.info("command '{command}' returned '{jout}'".format(
360 command
=command
, jout
=jout
))
362 success
, errstring
= validator(jout
, 0)
364 raise AsokCommandFailedError(command
, 0, jout
, errstring
)
367 def asok_command(self
, mds_rank
, command
, validator
):
368 log
.info("Running command '{command}'".format(command
=command
))
370 command_list
= command
.split()
372 # we just assume there's an active mds for every rank
373 mds_id
= self
.fs
.get_active_names()[mds_rank
]
374 proc
= self
.fs
.mon_manager
.admin_socket('mds', mds_id
,
375 command_list
, check_status
=False)
376 rout
= proc
.exitstatus
377 sout
= proc
.stdout
.getvalue()
380 jout
= json
.loads(sout
)
384 log
.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
385 command
=command
, rout
=rout
, sout
=sout
))
387 success
, errstring
= validator(jout
, rout
)
390 raise AsokCommandFailedError(command
, rout
, jout
, errstring
)
395 def clone_repo(client_mount
, path
):
396 repo
= "ceph-qa-suite"
397 repo_path
= os
.path
.join(path
, repo
)
398 client_mount
.run_shell(["mkdir", "-p", path
])
401 client_mount
.stat(repo_path
)
402 except CommandFailedError
:
403 client_mount
.run_shell([
404 "git", "clone", '--branch', 'giant',
405 "http://github.com/ceph/{repo}".format(repo
=repo
),
406 "{path}/{repo}".format(path
=path
, repo
=repo
)
412 class AsokCommandFailedError(Exception):
414 Exception thrown when we get an unexpected response
415 on an admin socket command
418 def __init__(self
, command
, rc
, json_out
, errstring
):
419 self
.command
= command
422 self
.errstring
= errstring
425 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
426 command
=self
.command
, rc
=self
.rc
, json
=self
.json
, es
=self
.errstring
)