]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
2 MDS admin socket scrubbing-related tests.
8 from teuthology
.exceptions
import CommandFailedError
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
12 log
= logging
.getLogger(__name__
)
14 class TestScrubControls(CephFSTestCase
):
16 Test basic scrub control operations such as abort, pause and resume.
22 def _abort_scrub(self
, expected
):
23 res
= self
.fs
.rank_tell(["scrub", "abort"])
24 self
.assertEqual(res
['return_code'], expected
)
25 def _pause_scrub(self
, expected
):
26 res
= self
.fs
.rank_tell(["scrub", "pause"])
27 self
.assertEqual(res
['return_code'], expected
)
28 def _resume_scrub(self
, expected
):
29 res
= self
.fs
.rank_tell(["scrub", "resume"])
30 self
.assertEqual(res
['return_code'], expected
)
31 def _get_scrub_status(self
):
32 return self
.fs
.rank_tell(["scrub", "status"])
33 def _check_task_status(self
, expected_status
):
34 task_status
= self
.fs
.get_task_status("scrub status")
35 active
= self
.fs
.get_active_names()
36 log
.debug("current active={0}".format(active
))
37 self
.assertTrue(task_status
[active
[0]].startswith(expected_status
))
39 def test_scrub_abort(self
):
40 test_dir
= "scrub_control_test_path"
41 abs_test_path
= "/{0}".format(test_dir
)
43 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
44 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
45 log
.info("client_path: {0}".format(client_path
))
47 log
.info("Cloning repo into place")
48 TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
50 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
51 self
.assertNotEqual(out_json
, None)
55 out_json
= self
._get
_scrub
_status
()
56 self
.assertTrue("no active" in out_json
['status'])
58 # sleep enough to fetch updated task status
60 self
._check
_task
_status
("idle")
62 def test_scrub_pause_and_resume(self
):
63 test_dir
= "scrub_control_test_path"
64 abs_test_path
= "/{0}".format(test_dir
)
66 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
67 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
68 log
.info("client_path: {0}".format(client_path
))
70 log
.info("Cloning repo into place")
71 _
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
73 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
74 self
.assertNotEqual(out_json
, None)
78 out_json
= self
._get
_scrub
_status
()
79 self
.assertTrue("PAUSED" in out_json
['status'])
81 # sleep enough to fetch updated task status
83 self
._check
_task
_status
("paused")
87 out_json
= self
._get
_scrub
_status
()
88 self
.assertFalse("PAUSED" in out_json
['status'])
90 def test_scrub_pause_and_resume_with_abort(self
):
91 test_dir
= "scrub_control_test_path"
92 abs_test_path
= "/{0}".format(test_dir
)
94 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
95 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
96 log
.info("client_path: {0}".format(client_path
))
98 log
.info("Cloning repo into place")
99 _
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
101 out_json
= self
.fs
.rank_tell(["scrub", "start", abs_test_path
, "recursive"])
102 self
.assertNotEqual(out_json
, None)
106 out_json
= self
._get
_scrub
_status
()
107 self
.assertTrue("PAUSED" in out_json
['status'])
109 # sleep enough to fetch updated task status
111 self
._check
_task
_status
("paused")
115 out_json
= self
._get
_scrub
_status
()
116 self
.assertTrue("PAUSED" in out_json
['status'])
117 self
.assertTrue("0 inodes" in out_json
['status'])
119 # sleep enough to fetch updated task status
121 self
._check
_task
_status
("paused")
124 self
._resume
_scrub
(0)
125 out_json
= self
._get
_scrub
_status
()
126 self
.assertTrue("no active" in out_json
['status'])
128 # sleep enough to fetch updated task status
130 self
._check
_task
_status
("idle")
132 def test_scrub_task_status_on_mds_failover(self
):
133 # sleep enough to fetch updated task status
136 (original_active
, ) = self
.fs
.get_active_names()
137 original_standbys
= self
.mds_cluster
.get_standby_daemons()
138 self
._check
_task
_status
("idle")
141 self
.fs
.mds_stop(original_active
)
143 grace
= float(self
.fs
.get_config("mds_beacon_grace", service_type
="mon"))
146 active
= self
.fs
.get_active_names()
147 return active
and active
[0] in original_standbys
149 log
.info("Waiting for promotion of one of the original standbys {0}".format(
151 self
.wait_until_true(promoted
, timeout
=grace
*2)
153 mgr_beacon_grace
= float(self
.fs
.get_config("mgr_service_beacon_grace", service_type
="mon"))
156 task_status
= self
.fs
.get_task_status("scrub status")
157 return original_active
not in task_status
158 self
.wait_until_true(status_check
, timeout
=mgr_beacon_grace
*2)
160 class TestScrubChecks(CephFSTestCase
):
162 Run flush and scrub commands on the specified files in the filesystem. This
163 task will run through a sequence of operations, but it is not comprehensive
164 on its own -- it doesn't manipulate the mds cache state to test on both
165 in- and out-of-memory parts of the hierarchy. So it's designed to be run
166 multiple times within a single test run, so that the test can manipulate
172 path: path/to/test/dir
176 Increment the run_seq on subsequent invocations within a single test run;
177 it uses that value to generate unique folder and file names.
183 def test_scrub_checks(self
):
187 def _checks(self
, run_seq
):
189 test_dir
= "scrub_test_path"
191 abs_test_path
= "/{0}".format(test_dir
)
193 log
.info("mountpoint: {0}".format(self
.mount_a
.mountpoint
))
194 client_path
= os
.path
.join(self
.mount_a
.mountpoint
, test_dir
)
195 log
.info("client_path: {0}".format(client_path
))
197 log
.info("Cloning repo into place")
198 repo_path
= TestScrubChecks
.clone_repo(self
.mount_a
, client_path
)
200 log
.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
201 id_
=mds_rank
, path
=abs_test_path
, seq
=run_seq
)
205 success_validator
= lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0)
207 nep
= "{test_path}/i/dont/exist".format(test_path
=abs_test_path
)
208 self
.asok_command(mds_rank
, "flush_path {nep}".format(nep
=nep
),
209 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
210 self
.tell_command(mds_rank
, "scrub start {nep}".format(nep
=nep
),
211 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
213 test_repo_path
= "{test_path}/ceph-qa-suite".format(test_path
=abs_test_path
)
214 dirpath
= "{repo_path}/suites".format(repo_path
=test_repo_path
)
217 log
.info("First run: flushing {dirpath}".format(dirpath
=dirpath
))
218 command
= "flush_path {dirpath}".format(dirpath
=dirpath
)
219 self
.asok_command(mds_rank
, command
, success_validator
)
220 command
= "scrub start {dirpath}".format(dirpath
=dirpath
)
221 self
.tell_command(mds_rank
, command
, success_validator
)
223 filepath
= "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
224 repo_path
=test_repo_path
)
226 log
.info("First run: flushing {filepath}".format(filepath
=filepath
))
227 command
= "flush_path {filepath}".format(filepath
=filepath
)
228 self
.asok_command(mds_rank
, command
, success_validator
)
229 command
= "scrub start {filepath}".format(filepath
=filepath
)
230 self
.tell_command(mds_rank
, command
, success_validator
)
232 filepath
= "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
233 format(repo_path
=test_repo_path
)
234 command
= "scrub start {filepath}".format(filepath
=filepath
)
235 self
.tell_command(mds_rank
, command
,
236 lambda j
, r
: self
.json_validator(j
, r
, "performed_validation",
240 log
.info("First run: flushing base dir /")
241 command
= "flush_path /"
242 self
.asok_command(mds_rank
, command
, success_validator
)
243 command
= "scrub start /"
244 self
.tell_command(mds_rank
, command
, success_validator
)
246 new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=repo_path
, i
=run_seq
)
247 test_new_dir
= "{repo_path}/new_dir_{i}".format(repo_path
=test_repo_path
,
249 self
.mount_a
.run_shell(["mkdir", new_dir
])
250 command
= "flush_path {dir}".format(dir=test_new_dir
)
251 self
.asok_command(mds_rank
, command
, success_validator
)
253 new_file
= "{repo_path}/new_file_{i}".format(repo_path
=repo_path
,
255 test_new_file
= "{repo_path}/new_file_{i}".format(repo_path
=test_repo_path
,
257 self
.mount_a
.write_n_mb(new_file
, 1)
259 command
= "flush_path {file}".format(file=test_new_file
)
260 self
.asok_command(mds_rank
, command
, success_validator
)
262 # check that scrub fails on errors
263 ino
= self
.mount_a
.path_to_ino(new_file
)
264 rados_obj_name
= "{ino:x}.00000000".format(ino
=ino
)
265 command
= "scrub start {file}".format(file=test_new_file
)
267 # Missing parent xattr -> ENODATA
268 self
.fs
.rados(["rmxattr", rados_obj_name
, "parent"], pool
=self
.fs
.get_data_pool_name())
269 self
.tell_command(mds_rank
, command
,
270 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENODATA
))
272 # Missing object -> ENOENT
273 self
.fs
.rados(["rm", rados_obj_name
], pool
=self
.fs
.get_data_pool_name())
274 self
.tell_command(mds_rank
, command
,
275 lambda j
, r
: self
.json_validator(j
, r
, "return_code", -errno
.ENOENT
))
277 command
= "flush_path /"
278 self
.asok_command(mds_rank
, command
, success_validator
)
280 def test_scrub_repair(self
):
282 test_dir
= "scrub_repair_path"
284 self
.mount_a
.run_shell(["sudo", "mkdir", test_dir
])
285 self
.mount_a
.run_shell(["sudo", "touch", "{0}/file".format(test_dir
)])
286 dir_objname
= "{:x}.00000000".format(self
.mount_a
.path_to_ino(test_dir
))
288 self
.mount_a
.umount_wait()
290 # flush journal entries to dirfrag objects, and expire journal
291 self
.fs
.mds_asok(['flush', 'journal'])
294 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
295 self
.fs
.rados(["rmomapkey", dir_objname
, "file_head"],
296 pool
=self
.fs
.get_metadata_pool_name())
298 self
.fs
.mds_fail_restart()
299 self
.fs
.wait_for_daemons()
301 self
.mount_a
.mount_wait()
303 # fragstat indicates the directory is not empty, rmdir should fail
304 with self
.assertRaises(CommandFailedError
) as ar
:
305 self
.mount_a
.run_shell(["sudo", "rmdir", test_dir
])
306 self
.assertEqual(ar
.exception
.exitstatus
, 1)
308 self
.tell_command(mds_rank
, "scrub start /{0} repair".format(test_dir
),
309 lambda j
, r
: self
.json_validator(j
, r
, "return_code", 0))
311 # wait a few second for background repair
314 # fragstat should be fixed
315 self
.mount_a
.run_shell(["sudo", "rmdir", test_dir
])
318 def json_validator(json_out
, rc
, element
, expected_value
):
320 return False, "asok command returned error {rc}".format(rc
=rc
)
321 element_value
= json_out
.get(element
)
322 if element_value
!= expected_value
:
323 return False, "unexpectedly got {jv} instead of {ev}!".format(
324 jv
=element_value
, ev
=expected_value
)
325 return True, "Succeeded"
327 def tell_command(self
, mds_rank
, command
, validator
):
328 log
.info("Running command '{command}'".format(command
=command
))
330 command_list
= command
.split()
331 jout
= self
.fs
.rank_tell(command_list
, mds_rank
)
333 log
.info("command '{command}' returned '{jout}'".format(
334 command
=command
, jout
=jout
))
336 success
, errstring
= validator(jout
, 0)
338 raise AsokCommandFailedError(command
, 0, jout
, errstring
)
341 def asok_command(self
, mds_rank
, command
, validator
):
342 log
.info("Running command '{command}'".format(command
=command
))
344 command_list
= command
.split()
346 # we just assume there's an active mds for every rank
347 mds_id
= self
.fs
.get_active_names()[mds_rank
]
348 proc
= self
.fs
.mon_manager
.admin_socket('mds', mds_id
,
349 command_list
, check_status
=False)
350 rout
= proc
.exitstatus
351 sout
= proc
.stdout
.getvalue()
354 jout
= json
.loads(sout
)
358 log
.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
359 command
=command
, rout
=rout
, sout
=sout
))
361 success
, errstring
= validator(jout
, rout
)
364 raise AsokCommandFailedError(command
, rout
, jout
, errstring
)
369 def clone_repo(client_mount
, path
):
370 repo
= "ceph-qa-suite"
371 repo_path
= os
.path
.join(path
, repo
)
372 client_mount
.run_shell(["mkdir", "-p", path
])
375 client_mount
.stat(repo_path
)
376 except CommandFailedError
:
377 client_mount
.run_shell([
378 "git", "clone", '--branch', 'giant',
379 "http://github.com/ceph/{repo}".format(repo
=repo
),
380 "{path}/{repo}".format(path
=path
, repo
=repo
)
386 class AsokCommandFailedError(Exception):
388 Exception thrown when we get an unexpected response
389 on an admin socket command
392 def __init__(self
, command
, rc
, json_out
, errstring
):
393 self
.command
= command
396 self
.errstring
= errstring
399 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
400 command
=self
.command
, rc
=self
.rc
, json
=self
.json
, es
=self
.errstring
)