]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / cephfs / test_scrub_checks.py
1 """
2 MDS admin socket scrubbing-related tests.
3 """
4 import json
5 import logging
6 import errno
7 import time
8 from teuthology.exceptions import CommandFailedError
9 from teuthology.contextutil import safe_while
10 import os
11 from tasks.cephfs.cephfs_test_case import CephFSTestCase
12
13 log = logging.getLogger(__name__)
14
15 class TestScrubControls(CephFSTestCase):
16 """
17 Test basic scrub control operations such as abort, pause and resume.
18 """
19
20 MDSS_REQUIRED = 2
21 CLIENTS_REQUIRED = 1
22
23 def _abort_scrub(self, expected):
24 res = self.fs.run_scrub(["abort"])
25 self.assertEqual(res['return_code'], expected)
26 def _pause_scrub(self, expected):
27 res = self.fs.run_scrub(["pause"])
28 self.assertEqual(res['return_code'], expected)
29 def _resume_scrub(self, expected):
30 res = self.fs.run_scrub(["resume"])
31 self.assertEqual(res['return_code'], expected)
32 def _check_task_status(self, expected_status, timo=120):
33 """ check scrub status for current active mds in ceph status """
34 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
35 while proceed():
36 active = self.fs.get_active_names()
37 log.debug("current active={0}".format(active))
38 task_status = self.fs.get_task_status("scrub status")
39 try:
40 if task_status[active[0]].startswith(expected_status):
41 return True
42 except KeyError:
43 pass
44
45 def _check_task_status_na(self, timo=120):
46 """ check absence of scrub status in ceph status """
47 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
48 while proceed():
49 active = self.fs.get_active_names()
50 log.debug("current active={0}".format(active))
51 task_status = self.fs.get_task_status("scrub status")
52 if not active[0] in task_status:
53 return True
54
55 def create_scrub_data(self, test_dir):
56 for i in range(32):
57 dirname = "dir.{0}".format(i)
58 dirpath = os.path.join(test_dir, dirname)
59 self.mount_a.run_shell_payload(f"""
60 set -e
61 mkdir -p {dirpath}
62 for ((i = 0; i < 32; i++)); do
63 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
64 done
65 """)
66
67 def test_scrub_abort(self):
68 test_dir = "scrub_control_test_path"
69 abs_test_path = "/{0}".format(test_dir)
70
71 self.create_scrub_data(test_dir)
72
73 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
74 self.assertNotEqual(out_json, None)
75
76 # abort and verify
77 self._abort_scrub(0)
78 self.fs.wait_until_scrub_complete(sleep=5, timeout=30)
79
80 # sleep enough to fetch updated task status
81 checked = self._check_task_status_na()
82 self.assertTrue(checked)
83
84 def test_scrub_pause_and_resume(self):
85 test_dir = "scrub_control_test_path"
86 abs_test_path = "/{0}".format(test_dir)
87
88 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
89 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
90 log.info("client_path: {0}".format(client_path))
91
92 self.create_scrub_data(test_dir)
93
94 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
95 self.assertNotEqual(out_json, None)
96
97 # pause and verify
98 self._pause_scrub(0)
99 out_json = self.fs.get_scrub_status()
100 self.assertTrue("PAUSED" in out_json['status'])
101
102 checked = self._check_task_status("paused")
103 self.assertTrue(checked)
104
105 # resume and verify
106 self._resume_scrub(0)
107 out_json = self.fs.get_scrub_status()
108 self.assertFalse("PAUSED" in out_json['status'])
109
110 checked = self._check_task_status_na()
111 self.assertTrue(checked)
112
113 def test_scrub_pause_and_resume_with_abort(self):
114 test_dir = "scrub_control_test_path"
115 abs_test_path = "/{0}".format(test_dir)
116
117 self.create_scrub_data(test_dir)
118
119 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
120 self.assertNotEqual(out_json, None)
121
122 # pause and verify
123 self._pause_scrub(0)
124 out_json = self.fs.get_scrub_status()
125 self.assertTrue("PAUSED" in out_json['status'])
126
127 checked = self._check_task_status("paused")
128 self.assertTrue(checked)
129
130 # abort and verify
131 self._abort_scrub(0)
132 out_json = self.fs.get_scrub_status()
133 self.assertTrue("PAUSED" in out_json['status'])
134 self.assertTrue("0 inodes" in out_json['status'])
135
136 # scrub status should still be paused...
137 checked = self._check_task_status("paused")
138 self.assertTrue(checked)
139
140 # resume and verify
141 self._resume_scrub(0)
142 self.assertTrue(self.fs.wait_until_scrub_complete(sleep=5, timeout=30))
143
144 checked = self._check_task_status_na()
145 self.assertTrue(checked)
146
147 def test_scrub_task_status_on_mds_failover(self):
148 (original_active, ) = self.fs.get_active_names()
149 original_standbys = self.mds_cluster.get_standby_daemons()
150
151 test_dir = "scrub_control_test_path"
152 abs_test_path = "/{0}".format(test_dir)
153
154 self.create_scrub_data(test_dir)
155
156 out_json = self.fs.run_scrub(["start", abs_test_path, "recursive"])
157 self.assertNotEqual(out_json, None)
158
159 # pause and verify
160 self._pause_scrub(0)
161 out_json = self.fs.get_scrub_status()
162 self.assertTrue("PAUSED" in out_json['status'])
163
164 checked = self._check_task_status("paused")
165 self.assertTrue(checked)
166
167 # Kill the rank 0
168 self.fs.mds_stop(original_active)
169
170 def promoted():
171 active = self.fs.get_active_names()
172 return active and active[0] in original_standbys
173
174 log.info("Waiting for promotion of one of the original standbys {0}".format(
175 original_standbys))
176 self.wait_until_true(promoted, timeout=self.fs.beacon_timeout)
177
178 self._check_task_status_na()
179
180 class TestScrubChecks(CephFSTestCase):
181 """
182 Run flush and scrub commands on the specified files in the filesystem. This
183 task will run through a sequence of operations, but it is not comprehensive
184 on its own -- it doesn't manipulate the mds cache state to test on both
185 in- and out-of-memory parts of the hierarchy. So it's designed to be run
186 multiple times within a single test run, so that the test can manipulate
187 memory state.
188
189 Usage:
190 mds_scrub_checks:
191 mds_rank: 0
192 path: path/to/test/dir
193 client: 0
194 run_seq: [0-9]+
195
196 Increment the run_seq on subsequent invocations within a single test run;
197 it uses that value to generate unique folder and file names.
198 """
199
200 MDSS_REQUIRED = 1
201 CLIENTS_REQUIRED = 1
202
203 def test_scrub_checks(self):
204 self._checks(0)
205 self._checks(1)
206
207 def _checks(self, run_seq):
208 mds_rank = 0
209 test_dir = "scrub_test_path"
210
211 abs_test_path = "/{0}".format(test_dir)
212
213 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
214 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
215 log.info("client_path: {0}".format(client_path))
216
217 log.info("Cloning repo into place")
218 repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)
219
220 log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
221 id_=mds_rank, path=abs_test_path, seq=run_seq)
222 )
223
224
225 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)
226
227 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
228 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
229 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
230 self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep),
231 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
232
233 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
234 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)
235
236 if run_seq == 0:
237 log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
238 command = "flush_path {dirpath}".format(dirpath=dirpath)
239 self.asok_command(mds_rank, command, success_validator)
240 command = "scrub start {dirpath}".format(dirpath=dirpath)
241 self.tell_command(mds_rank, command, success_validator)
242
243 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
244 repo_path=test_repo_path)
245 if run_seq == 0:
246 log.info("First run: flushing {filepath}".format(filepath=filepath))
247 command = "flush_path {filepath}".format(filepath=filepath)
248 self.asok_command(mds_rank, command, success_validator)
249 command = "scrub start {filepath}".format(filepath=filepath)
250 self.tell_command(mds_rank, command, success_validator)
251
252 if run_seq == 0:
253 log.info("First run: flushing base dir /")
254 command = "flush_path /"
255 self.asok_command(mds_rank, command, success_validator)
256 command = "scrub start /"
257 self.tell_command(mds_rank, command, success_validator)
258
259 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
260 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
261 i=run_seq)
262 self.mount_a.run_shell(["mkdir", new_dir])
263 command = "flush_path {dir}".format(dir=test_new_dir)
264 self.asok_command(mds_rank, command, success_validator)
265
266 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,
267 i=run_seq)
268 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,
269 i=run_seq)
270 self.mount_a.write_n_mb(new_file, 1)
271
272 command = "flush_path {file}".format(file=test_new_file)
273 self.asok_command(mds_rank, command, success_validator)
274
275 # check that scrub fails on errors
276 ino = self.mount_a.path_to_ino(new_file)
277 rados_obj_name = "{ino:x}.00000000".format(ino=ino)
278 command = "scrub start {file}".format(file=test_new_file)
279
280 def _check_and_clear_damage(ino, dtype):
281 all_damage = self.fs.rank_tell(["damage", "ls"], mds_rank)
282 damage = [d for d in all_damage if d['ino'] == ino and d['damage_type'] == dtype]
283 for d in damage:
284 self.fs.mon_manager.raw_cluster_cmd(
285 'tell', 'mds.{0}'.format(self.fs.get_active_names()[mds_rank]),
286 "damage", "rm", str(d['id']))
287 return len(damage) > 0
288
289 # Missing parent xattr
290 self.assertFalse(_check_and_clear_damage(ino, "backtrace"));
291 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
292 self.tell_command(mds_rank, command, success_validator)
293 self.fs.wait_until_scrub_complete(sleep=5, timeout=30)
294 self.assertTrue(_check_and_clear_damage(ino, "backtrace"));
295
296 command = "flush_path /"
297 self.asok_command(mds_rank, command, success_validator)
298
299 def scrub_with_stray_evaluation(self, fs, mnt, path, flag, files=2000,
300 _hard_links=3):
301 fs.set_allow_new_snaps(True)
302
303 test_dir = "stray_eval_dir"
304 mnt.run_shell(["mkdir", test_dir])
305 client_path = os.path.join(mnt.mountpoint, test_dir)
306 mnt.create_n_files(fs_path=f"{test_dir}/file", count=files,
307 hard_links=_hard_links)
308 mnt.run_shell(["mkdir", f"{client_path}/.snap/snap1-{test_dir}"])
309 mnt.run_shell(f"find {client_path}/ -type f -delete")
310 mnt.run_shell(["rmdir", f"{client_path}/.snap/snap1-{test_dir}"])
311 perf_dump = fs.rank_tell(["perf", "dump"], 0)
312 self.assertNotEqual(perf_dump.get('mds_cache').get('num_strays'),
313 0, "mdcache.num_strays is zero")
314
315 log.info(
316 f"num of strays: {perf_dump.get('mds_cache').get('num_strays')}")
317
318 out_json = fs.run_scrub(["start", path, flag])
319 self.assertNotEqual(out_json, None)
320 self.assertEqual(out_json["return_code"], 0)
321
322 self.assertEqual(
323 fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
324
325 perf_dump = fs.rank_tell(["perf", "dump"], 0)
326 self.assertEqual(int(perf_dump.get('mds_cache').get('num_strays')),
327 0, "mdcache.num_strays is non-zero")
328
329 def test_scrub_repair(self):
330 mds_rank = 0
331 test_dir = "scrub_repair_path"
332
333 self.mount_a.run_shell(["mkdir", test_dir])
334 self.mount_a.run_shell(["touch", "{0}/file".format(test_dir)])
335 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))
336
337 self.mount_a.umount_wait()
338
339 # flush journal entries to dirfrag objects, and expire journal
340 self.fs.mds_asok(['flush', 'journal'])
341 self.fs.mds_stop()
342
343 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
344 self.fs.radosm(["rmomapkey", dir_objname, "file_head"])
345
346 self.fs.mds_fail_restart()
347 self.fs.wait_for_daemons()
348
349 self.mount_a.mount_wait()
350
351 # fragstat indicates the directory is not empty, rmdir should fail
352 with self.assertRaises(CommandFailedError) as ar:
353 self.mount_a.run_shell(["rmdir", test_dir])
354 self.assertEqual(ar.exception.exitstatus, 1)
355
356 self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir),
357 lambda j, r: self.json_validator(j, r, "return_code", 0))
358
359 # wait a few second for background repair
360 time.sleep(10)
361
362 # fragstat should be fixed
363 self.mount_a.run_shell(["rmdir", test_dir])
364
365 def test_stray_evaluation_with_scrub(self):
366 """
367 test that scrub can iterate over ~mdsdir and evaluate strays
368 """
369 self.scrub_with_stray_evaluation(self.fs, self.mount_a, "~mdsdir",
370 "recursive")
371
372 def test_flag_scrub_mdsdir(self):
373 """
374 test flag scrub_mdsdir
375 """
376 self.scrub_with_stray_evaluation(self.fs, self.mount_a, "/",
377 "recursive,scrub_mdsdir")
378
379 @staticmethod
380 def json_validator(json_out, rc, element, expected_value):
381 if rc != 0:
382 return False, "asok command returned error {rc}".format(rc=rc)
383 element_value = json_out.get(element)
384 if element_value != expected_value:
385 return False, "unexpectedly got {jv} instead of {ev}!".format(
386 jv=element_value, ev=expected_value)
387 return True, "Succeeded"
388
389 def tell_command(self, mds_rank, command, validator):
390 log.info("Running command '{command}'".format(command=command))
391
392 command_list = command.split()
393 jout = self.fs.rank_tell(command_list, mds_rank)
394
395 log.info("command '{command}' returned '{jout}'".format(
396 command=command, jout=jout))
397
398 success, errstring = validator(jout, 0)
399 if not success:
400 raise AsokCommandFailedError(command, 0, jout, errstring)
401 return jout
402
403 def asok_command(self, mds_rank, command, validator):
404 log.info("Running command '{command}'".format(command=command))
405
406 command_list = command.split()
407
408 # we just assume there's an active mds for every rank
409 mds_id = self.fs.get_active_names()[mds_rank]
410 proc = self.fs.mon_manager.admin_socket('mds', mds_id,
411 command_list, check_status=False)
412 rout = proc.exitstatus
413 sout = proc.stdout.getvalue()
414
415 if sout.strip():
416 jout = json.loads(sout)
417 else:
418 jout = None
419
420 log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
421 command=command, rout=rout, sout=sout))
422
423 success, errstring = validator(jout, rout)
424
425 if not success:
426 raise AsokCommandFailedError(command, rout, jout, errstring)
427
428 return jout
429
430 @staticmethod
431 def clone_repo(client_mount, path):
432 repo = "ceph-qa-suite"
433 repo_path = os.path.join(path, repo)
434 client_mount.run_shell(["mkdir", "-p", path])
435
436 try:
437 client_mount.stat(repo_path)
438 except CommandFailedError:
439 client_mount.run_shell([
440 "git", "clone", '--branch', 'giant',
441 "http://github.com/ceph/{repo}".format(repo=repo),
442 "{path}/{repo}".format(path=path, repo=repo)
443 ])
444
445 return repo_path
446
447
448 class AsokCommandFailedError(Exception):
449 """
450 Exception thrown when we get an unexpected response
451 on an admin socket command
452 """
453
454 def __init__(self, command, rc, json_out, errstring):
455 self.command = command
456 self.rc = rc
457 self.json = json_out
458 self.errstring = errstring
459
460 def __str__(self):
461 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
462 command=self.command, rc=self.rc, json=self.json, es=self.errstring)