]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/test_scrub_checks.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / cephfs / test_scrub_checks.py
CommitLineData
7c673cae
FG
1"""
2MDS admin socket scrubbing-related tests.
3"""
4import json
5import logging
6import errno
7import time
8from teuthology.exceptions import CommandFailedError
f91f0fd5 9from teuthology.contextutil import safe_while
7c673cae
FG
10import os
11from tasks.cephfs.cephfs_test_case import CephFSTestCase
12
13log = logging.getLogger(__name__)
14
11fdf7f2
TL
15class TestScrubControls(CephFSTestCase):
16 """
17 Test basic scrub control operations such as abort, pause and resume.
18 """
19
1911f103 20 MDSS_REQUIRED = 2
11fdf7f2
TL
21 CLIENTS_REQUIRED = 1
22
23 def _abort_scrub(self, expected):
f67539c2 24 res = self.fs.run_scrub(["abort"])
11fdf7f2
TL
25 self.assertEqual(res['return_code'], expected)
26 def _pause_scrub(self, expected):
f67539c2 27 res = self.fs.run_scrub(["pause"])
11fdf7f2
TL
28 self.assertEqual(res['return_code'], expected)
29 def _resume_scrub(self, expected):
f67539c2 30 res = self.fs.run_scrub(["resume"])
11fdf7f2 31 self.assertEqual(res['return_code'], expected)
f91f0fd5
TL
32 def _check_task_status(self, expected_status, timo=120):
33 """ check scrub status for current active mds in ceph status """
34 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
35 while proceed():
36 active = self.fs.get_active_names()
37 log.debug("current active={0}".format(active))
38 task_status = self.fs.get_task_status("scrub status")
39 try:
40 if task_status[active[0]].startswith(expected_status):
41 return True
42 except KeyError:
43 pass
44
45 def _check_task_status_na(self, timo=120):
46 """ check absence of scrub status in ceph status """
47 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
48 while proceed():
49 active = self.fs.get_active_names()
50 log.debug("current active={0}".format(active))
51 task_status = self.fs.get_task_status("scrub status")
52 if not active[0] in task_status:
53 return True
54
55 def create_scrub_data(self, test_dir):
56 for i in range(32):
57 dirname = "dir.{0}".format(i)
58 dirpath = os.path.join(test_dir, dirname)
59 self.mount_a.run_shell_payload(f"""
60set -e
61mkdir -p {dirpath}
62for ((i = 0; i < 32; i++)); do
63 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
64done
65""")
11fdf7f2
TL
66
67 def test_scrub_abort(self):
68 test_dir = "scrub_control_test_path"
69 abs_test_path = "/{0}".format(test_dir)
70
f91f0fd5 71 self.create_scrub_data(test_dir)
11fdf7f2 72
f67539c2 73 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
11fdf7f2
TL
74 self.assertNotEqual(out_json, None)
75
76 # abort and verify
77 self._abort_scrub(0)
f67539c2 78 self.fs.wait_until_scrub_complete(sleep=5, timeout=30)
11fdf7f2 79
9f95a23c 80 # sleep enough to fetch updated task status
f91f0fd5
TL
81 checked = self._check_task_status_na()
82 self.assertTrue(checked)
9f95a23c 83
11fdf7f2
TL
84 def test_scrub_pause_and_resume(self):
85 test_dir = "scrub_control_test_path"
86 abs_test_path = "/{0}".format(test_dir)
87
88 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
89 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
90 log.info("client_path: {0}".format(client_path))
91
f91f0fd5 92 self.create_scrub_data(test_dir)
11fdf7f2 93
f67539c2 94 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
11fdf7f2
TL
95 self.assertNotEqual(out_json, None)
96
97 # pause and verify
98 self._pause_scrub(0)
f67539c2 99 out_json = self.fs.get_scrub_status()
11fdf7f2
TL
100 self.assertTrue("PAUSED" in out_json['status'])
101
f91f0fd5
TL
102 checked = self._check_task_status("paused")
103 self.assertTrue(checked)
9f95a23c 104
11fdf7f2
TL
105 # resume and verify
106 self._resume_scrub(0)
f67539c2 107 out_json = self.fs.get_scrub_status()
11fdf7f2
TL
108 self.assertFalse("PAUSED" in out_json['status'])
109
f91f0fd5
TL
110 checked = self._check_task_status_na()
111 self.assertTrue(checked)
112
11fdf7f2
TL
113 def test_scrub_pause_and_resume_with_abort(self):
114 test_dir = "scrub_control_test_path"
115 abs_test_path = "/{0}".format(test_dir)
116
f91f0fd5 117 self.create_scrub_data(test_dir)
11fdf7f2 118
f67539c2 119 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
11fdf7f2
TL
120 self.assertNotEqual(out_json, None)
121
122 # pause and verify
123 self._pause_scrub(0)
f67539c2 124 out_json = self.fs.get_scrub_status()
11fdf7f2
TL
125 self.assertTrue("PAUSED" in out_json['status'])
126
f91f0fd5
TL
127 checked = self._check_task_status("paused")
128 self.assertTrue(checked)
9f95a23c 129
11fdf7f2
TL
130 # abort and verify
131 self._abort_scrub(0)
f67539c2 132 out_json = self.fs.get_scrub_status()
11fdf7f2
TL
133 self.assertTrue("PAUSED" in out_json['status'])
134 self.assertTrue("0 inodes" in out_json['status'])
135
f91f0fd5
TL
136 # scrub status should still be paused...
137 checked = self._check_task_status("paused")
138 self.assertTrue(checked)
9f95a23c 139
11fdf7f2
TL
140 # resume and verify
141 self._resume_scrub(0)
f67539c2 142 out_json = self.fs.get_scrub_status()
11fdf7f2 143 self.assertTrue("no active" in out_json['status'])
7c673cae 144
f91f0fd5
TL
145 checked = self._check_task_status_na()
146 self.assertTrue(checked)
9f95a23c 147
1911f103 148 def test_scrub_task_status_on_mds_failover(self):
1911f103
TL
149 (original_active, ) = self.fs.get_active_names()
150 original_standbys = self.mds_cluster.get_standby_daemons()
f91f0fd5
TL
151
152 test_dir = "scrub_control_test_path"
153 abs_test_path = "/{0}".format(test_dir)
154
155 self.create_scrub_data(test_dir)
156
f67539c2 157 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
f91f0fd5
TL
158 self.assertNotEqual(out_json, None)
159
160 # pause and verify
161 self._pause_scrub(0)
f67539c2 162 out_json = self.fs.get_scrub_status()
f91f0fd5
TL
163 self.assertTrue("PAUSED" in out_json['status'])
164
165 checked = self._check_task_status("paused")
166 self.assertTrue(checked)
1911f103
TL
167
168 # Kill the rank 0
169 self.fs.mds_stop(original_active)
170
1911f103
TL
171 def promoted():
172 active = self.fs.get_active_names()
173 return active and active[0] in original_standbys
174
175 log.info("Waiting for promotion of one of the original standbys {0}".format(
176 original_standbys))
20effc67 177 self.wait_until_true(promoted, timeout=self.fs.beacon_timeout)
1911f103 178
f91f0fd5 179 self._check_task_status_na()
1911f103 180
7c673cae
FG
181class TestScrubChecks(CephFSTestCase):
182 """
183 Run flush and scrub commands on the specified files in the filesystem. This
184 task will run through a sequence of operations, but it is not comprehensive
185 on its own -- it doesn't manipulate the mds cache state to test on both
186 in- and out-of-memory parts of the hierarchy. So it's designed to be run
187 multiple times within a single test run, so that the test can manipulate
188 memory state.
189
190 Usage:
191 mds_scrub_checks:
192 mds_rank: 0
193 path: path/to/test/dir
194 client: 0
195 run_seq: [0-9]+
196
197 Increment the run_seq on subsequent invocations within a single test run;
198 it uses that value to generate unique folder and file names.
199 """
200
201 MDSS_REQUIRED = 1
202 CLIENTS_REQUIRED = 1
203
204 def test_scrub_checks(self):
205 self._checks(0)
206 self._checks(1)
207
208 def _checks(self, run_seq):
209 mds_rank = 0
210 test_dir = "scrub_test_path"
211
212 abs_test_path = "/{0}".format(test_dir)
213
214 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
215 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
216 log.info("client_path: {0}".format(client_path))
217
218 log.info("Cloning repo into place")
11fdf7f2 219 repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)
7c673cae 220
9f95a23c
TL
221 log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
222 id_=mds_rank, path=abs_test_path, seq=run_seq)
223 )
7c673cae
FG
224
225
226 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)
227
228 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
229 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
230 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
11fdf7f2 231 self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep),
7c673cae
FG
232 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
233
234 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
235 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)
236
237 if run_seq == 0:
238 log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
239 command = "flush_path {dirpath}".format(dirpath=dirpath)
240 self.asok_command(mds_rank, command, success_validator)
11fdf7f2
TL
241 command = "scrub start {dirpath}".format(dirpath=dirpath)
242 self.tell_command(mds_rank, command, success_validator)
7c673cae
FG
243
244 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
245 repo_path=test_repo_path)
246 if run_seq == 0:
247 log.info("First run: flushing {filepath}".format(filepath=filepath))
248 command = "flush_path {filepath}".format(filepath=filepath)
249 self.asok_command(mds_rank, command, success_validator)
11fdf7f2
TL
250 command = "scrub start {filepath}".format(filepath=filepath)
251 self.tell_command(mds_rank, command, success_validator)
7c673cae 252
7c673cae
FG
253 if run_seq == 0:
254 log.info("First run: flushing base dir /")
255 command = "flush_path /"
256 self.asok_command(mds_rank, command, success_validator)
11fdf7f2
TL
257 command = "scrub start /"
258 self.tell_command(mds_rank, command, success_validator)
7c673cae
FG
259
260 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
261 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
262 i=run_seq)
263 self.mount_a.run_shell(["mkdir", new_dir])
264 command = "flush_path {dir}".format(dir=test_new_dir)
265 self.asok_command(mds_rank, command, success_validator)
266
267 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,
268 i=run_seq)
269 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,
270 i=run_seq)
271 self.mount_a.write_n_mb(new_file, 1)
272
273 command = "flush_path {file}".format(file=test_new_file)
274 self.asok_command(mds_rank, command, success_validator)
275
276 # check that scrub fails on errors
277 ino = self.mount_a.path_to_ino(new_file)
278 rados_obj_name = "{ino:x}.00000000".format(ino=ino)
11fdf7f2 279 command = "scrub start {file}".format(file=test_new_file)
7c673cae 280
f67539c2
TL
281 def _check_and_clear_damage(ino, dtype):
282 all_damage = self.fs.rank_tell(["damage", "ls"], mds_rank)
283 damage = [d for d in all_damage if d['ino'] == ino and d['damage_type'] == dtype]
284 for d in damage:
285 self.fs.mon_manager.raw_cluster_cmd(
286 'tell', 'mds.{0}'.format(self.fs.get_active_names()[mds_rank]),
287 "damage", "rm", str(d['id']))
288 return len(damage) > 0
289
290 # Missing parent xattr
291 self.assertFalse(_check_and_clear_damage(ino, "backtrace"));
7c673cae 292 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
f67539c2
TL
293 self.tell_command(mds_rank, command, success_validator)
294 self.fs.wait_until_scrub_complete(sleep=5, timeout=30)
295 self.assertTrue(_check_and_clear_damage(ino, "backtrace"));
7c673cae
FG
296
297 command = "flush_path /"
298 self.asok_command(mds_rank, command, success_validator)
299
300 def test_scrub_repair(self):
301 mds_rank = 0
302 test_dir = "scrub_repair_path"
303
522d829b
TL
304 self.mount_a.run_shell(["mkdir", test_dir])
305 self.mount_a.run_shell(["touch", "{0}/file".format(test_dir)])
7c673cae
FG
306 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))
307
308 self.mount_a.umount_wait()
309
310 # flush journal entries to dirfrag objects, and expire journal
311 self.fs.mds_asok(['flush', 'journal'])
312 self.fs.mds_stop()
313
314 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
f67539c2 315 self.fs.radosm(["rmomapkey", dir_objname, "file_head"])
7c673cae
FG
316
317 self.fs.mds_fail_restart()
318 self.fs.wait_for_daemons()
319
e306af50 320 self.mount_a.mount_wait()
7c673cae
FG
321
322 # fragstat indicates the directory is not empty, rmdir should fail
323 with self.assertRaises(CommandFailedError) as ar:
522d829b 324 self.mount_a.run_shell(["rmdir", test_dir])
7c673cae
FG
325 self.assertEqual(ar.exception.exitstatus, 1)
326
11fdf7f2 327 self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir),
7c673cae
FG
328 lambda j, r: self.json_validator(j, r, "return_code", 0))
329
9f95a23c
TL
330 # wait a few second for background repair
331 time.sleep(10)
7c673cae 332
9f95a23c 333 # fragstat should be fixed
522d829b 334 self.mount_a.run_shell(["rmdir", test_dir])
7c673cae
FG
335
336 @staticmethod
337 def json_validator(json_out, rc, element, expected_value):
338 if rc != 0:
339 return False, "asok command returned error {rc}".format(rc=rc)
340 element_value = json_out.get(element)
341 if element_value != expected_value:
342 return False, "unexpectedly got {jv} instead of {ev}!".format(
343 jv=element_value, ev=expected_value)
344 return True, "Succeeded"
345
11fdf7f2
TL
346 def tell_command(self, mds_rank, command, validator):
347 log.info("Running command '{command}'".format(command=command))
348
349 command_list = command.split()
350 jout = self.fs.rank_tell(command_list, mds_rank)
351
352 log.info("command '{command}' returned '{jout}'".format(
353 command=command, jout=jout))
354
355 success, errstring = validator(jout, 0)
356 if not success:
9f95a23c 357 raise AsokCommandFailedError(command, 0, jout, errstring)
11fdf7f2
TL
358 return jout
359
7c673cae
FG
360 def asok_command(self, mds_rank, command, validator):
361 log.info("Running command '{command}'".format(command=command))
362
363 command_list = command.split()
364
365 # we just assume there's an active mds for every rank
366 mds_id = self.fs.get_active_names()[mds_rank]
367 proc = self.fs.mon_manager.admin_socket('mds', mds_id,
368 command_list, check_status=False)
369 rout = proc.exitstatus
370 sout = proc.stdout.getvalue()
371
372 if sout.strip():
373 jout = json.loads(sout)
374 else:
375 jout = None
376
9f95a23c
TL
377 log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
378 command=command, rout=rout, sout=sout))
7c673cae
FG
379
380 success, errstring = validator(jout, rout)
381
382 if not success:
383 raise AsokCommandFailedError(command, rout, jout, errstring)
384
385 return jout
386
11fdf7f2
TL
387 @staticmethod
388 def clone_repo(client_mount, path):
7c673cae
FG
389 repo = "ceph-qa-suite"
390 repo_path = os.path.join(path, repo)
391 client_mount.run_shell(["mkdir", "-p", path])
392
393 try:
394 client_mount.stat(repo_path)
395 except CommandFailedError:
396 client_mount.run_shell([
397 "git", "clone", '--branch', 'giant',
398 "http://github.com/ceph/{repo}".format(repo=repo),
399 "{path}/{repo}".format(path=path, repo=repo)
400 ])
401
402 return repo_path
403
404
405class AsokCommandFailedError(Exception):
406 """
407 Exception thrown when we get an unexpected response
408 on an admin socket command
409 """
410
411 def __init__(self, command, rc, json_out, errstring):
412 self.command = command
413 self.rc = rc
414 self.json = json_out
415 self.errstring = errstring
416
417 def __str__(self):
9f95a23c
TL
418 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
419 command=self.command, rc=self.rc, json=self.json, es=self.errstring)