]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
d9c2b5a7c496145f8b02cb7a18a8d63654a045bf
[ceph.git] / ceph / qa / tasks / cephfs / test_scrub_checks.py
1 """
2 MDS admin socket scrubbing-related tests.
3 """
4 import json
5 import logging
6 import errno
7 import time
8 from teuthology.exceptions import CommandFailedError
9 from teuthology.contextutil import safe_while
10 import os
11 from tasks.cephfs.cephfs_test_case import CephFSTestCase
12
13 log = logging.getLogger(__name__)
14
15 class TestScrubControls(CephFSTestCase):
16 """
17 Test basic scrub control operations such as abort, pause and resume.
18 """
19
20 MDSS_REQUIRED = 2
21 CLIENTS_REQUIRED = 1
22
23 def _abort_scrub(self, expected):
24 res = self.fs.rank_tell(["scrub", "abort"])
25 self.assertEqual(res['return_code'], expected)
26 def _pause_scrub(self, expected):
27 res = self.fs.rank_tell(["scrub", "pause"])
28 self.assertEqual(res['return_code'], expected)
29 def _resume_scrub(self, expected):
30 res = self.fs.rank_tell(["scrub", "resume"])
31 self.assertEqual(res['return_code'], expected)
32 def _get_scrub_status(self):
33 return self.fs.rank_tell(["scrub", "status"])
34 def _check_task_status(self, expected_status, timo=120):
35 """ check scrub status for current active mds in ceph status """
36 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
37 while proceed():
38 active = self.fs.get_active_names()
39 log.debug("current active={0}".format(active))
40 task_status = self.fs.get_task_status("scrub status")
41 try:
42 if task_status[active[0]].startswith(expected_status):
43 return True
44 except KeyError:
45 pass
46
47 def _check_task_status_na(self, timo=120):
48 """ check absence of scrub status in ceph status """
49 with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
50 while proceed():
51 active = self.fs.get_active_names()
52 log.debug("current active={0}".format(active))
53 task_status = self.fs.get_task_status("scrub status")
54 if not active[0] in task_status:
55 return True
56
57 def create_scrub_data(self, test_dir):
58 for i in range(32):
59 dirname = "dir.{0}".format(i)
60 dirpath = os.path.join(test_dir, dirname)
61 self.mount_a.run_shell_payload(f"""
62 set -e
63 mkdir -p {dirpath}
64 for ((i = 0; i < 32; i++)); do
65 dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
66 done
67 """)
68
69 def test_scrub_abort(self):
70 test_dir = "scrub_control_test_path"
71 abs_test_path = "/{0}".format(test_dir)
72
73 self.create_scrub_data(test_dir)
74
75 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
76 self.assertNotEqual(out_json, None)
77
78 # abort and verify
79 self._abort_scrub(0)
80 out_json = self._get_scrub_status()
81 self.assertTrue("no active" in out_json['status'])
82
83 # sleep enough to fetch updated task status
84 checked = self._check_task_status_na()
85 self.assertTrue(checked)
86
87 def test_scrub_pause_and_resume(self):
88 test_dir = "scrub_control_test_path"
89 abs_test_path = "/{0}".format(test_dir)
90
91 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
92 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
93 log.info("client_path: {0}".format(client_path))
94
95 self.create_scrub_data(test_dir)
96
97 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
98 self.assertNotEqual(out_json, None)
99
100 # pause and verify
101 self._pause_scrub(0)
102 out_json = self._get_scrub_status()
103 self.assertTrue("PAUSED" in out_json['status'])
104
105 checked = self._check_task_status("paused")
106 self.assertTrue(checked)
107
108 # resume and verify
109 self._resume_scrub(0)
110 out_json = self._get_scrub_status()
111 self.assertFalse("PAUSED" in out_json['status'])
112
113 checked = self._check_task_status_na()
114 self.assertTrue(checked)
115
116 def test_scrub_pause_and_resume_with_abort(self):
117 test_dir = "scrub_control_test_path"
118 abs_test_path = "/{0}".format(test_dir)
119
120 self.create_scrub_data(test_dir)
121
122 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
123 self.assertNotEqual(out_json, None)
124
125 # pause and verify
126 self._pause_scrub(0)
127 out_json = self._get_scrub_status()
128 self.assertTrue("PAUSED" in out_json['status'])
129
130 checked = self._check_task_status("paused")
131 self.assertTrue(checked)
132
133 # abort and verify
134 self._abort_scrub(0)
135 out_json = self._get_scrub_status()
136 self.assertTrue("PAUSED" in out_json['status'])
137 self.assertTrue("0 inodes" in out_json['status'])
138
139 # scrub status should still be paused...
140 checked = self._check_task_status("paused")
141 self.assertTrue(checked)
142
143 # resume and verify
144 self._resume_scrub(0)
145 out_json = self._get_scrub_status()
146 self.assertTrue("no active" in out_json['status'])
147
148 checked = self._check_task_status_na()
149 self.assertTrue(checked)
150
151 def test_scrub_task_status_on_mds_failover(self):
152 (original_active, ) = self.fs.get_active_names()
153 original_standbys = self.mds_cluster.get_standby_daemons()
154
155 test_dir = "scrub_control_test_path"
156 abs_test_path = "/{0}".format(test_dir)
157
158 self.create_scrub_data(test_dir)
159
160 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
161 self.assertNotEqual(out_json, None)
162
163 # pause and verify
164 self._pause_scrub(0)
165 out_json = self._get_scrub_status()
166 self.assertTrue("PAUSED" in out_json['status'])
167
168 checked = self._check_task_status("paused")
169 self.assertTrue(checked)
170
171 # Kill the rank 0
172 self.fs.mds_stop(original_active)
173
174 grace = float(self.fs.get_config("mds_beacon_grace", service_type="mon"))
175
176 def promoted():
177 active = self.fs.get_active_names()
178 return active and active[0] in original_standbys
179
180 log.info("Waiting for promotion of one of the original standbys {0}".format(
181 original_standbys))
182 self.wait_until_true(promoted, timeout=grace*2)
183
184 self._check_task_status_na()
185
186 class TestScrubChecks(CephFSTestCase):
187 """
188 Run flush and scrub commands on the specified files in the filesystem. This
189 task will run through a sequence of operations, but it is not comprehensive
190 on its own -- it doesn't manipulate the mds cache state to test on both
191 in- and out-of-memory parts of the hierarchy. So it's designed to be run
192 multiple times within a single test run, so that the test can manipulate
193 memory state.
194
195 Usage:
196 mds_scrub_checks:
197 mds_rank: 0
198 path: path/to/test/dir
199 client: 0
200 run_seq: [0-9]+
201
202 Increment the run_seq on subsequent invocations within a single test run;
203 it uses that value to generate unique folder and file names.
204 """
205
206 MDSS_REQUIRED = 1
207 CLIENTS_REQUIRED = 1
208
209 def test_scrub_checks(self):
210 self._checks(0)
211 self._checks(1)
212
213 def _checks(self, run_seq):
214 mds_rank = 0
215 test_dir = "scrub_test_path"
216
217 abs_test_path = "/{0}".format(test_dir)
218
219 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
220 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
221 log.info("client_path: {0}".format(client_path))
222
223 log.info("Cloning repo into place")
224 repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)
225
226 log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
227 id_=mds_rank, path=abs_test_path, seq=run_seq)
228 )
229
230
231 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)
232
233 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
234 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
235 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
236 self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep),
237 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
238
239 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
240 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)
241
242 if run_seq == 0:
243 log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
244 command = "flush_path {dirpath}".format(dirpath=dirpath)
245 self.asok_command(mds_rank, command, success_validator)
246 command = "scrub start {dirpath}".format(dirpath=dirpath)
247 self.tell_command(mds_rank, command, success_validator)
248
249 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
250 repo_path=test_repo_path)
251 if run_seq == 0:
252 log.info("First run: flushing {filepath}".format(filepath=filepath))
253 command = "flush_path {filepath}".format(filepath=filepath)
254 self.asok_command(mds_rank, command, success_validator)
255 command = "scrub start {filepath}".format(filepath=filepath)
256 self.tell_command(mds_rank, command, success_validator)
257
258 filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
259 format(repo_path=test_repo_path)
260 command = "scrub start {filepath}".format(filepath=filepath)
261 self.tell_command(mds_rank, command,
262 lambda j, r: self.json_validator(j, r, "performed_validation",
263 False))
264
265 if run_seq == 0:
266 log.info("First run: flushing base dir /")
267 command = "flush_path /"
268 self.asok_command(mds_rank, command, success_validator)
269 command = "scrub start /"
270 self.tell_command(mds_rank, command, success_validator)
271
272 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
273 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
274 i=run_seq)
275 self.mount_a.run_shell(["mkdir", new_dir])
276 command = "flush_path {dir}".format(dir=test_new_dir)
277 self.asok_command(mds_rank, command, success_validator)
278
279 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,
280 i=run_seq)
281 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,
282 i=run_seq)
283 self.mount_a.write_n_mb(new_file, 1)
284
285 command = "flush_path {file}".format(file=test_new_file)
286 self.asok_command(mds_rank, command, success_validator)
287
288 # check that scrub fails on errors
289 ino = self.mount_a.path_to_ino(new_file)
290 rados_obj_name = "{ino:x}.00000000".format(ino=ino)
291 command = "scrub start {file}".format(file=test_new_file)
292
293 # Missing parent xattr -> ENODATA
294 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
295 self.tell_command(mds_rank, command,
296 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA))
297
298 # Missing object -> ENOENT
299 self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name())
300 self.tell_command(mds_rank, command,
301 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
302
303 command = "flush_path /"
304 self.asok_command(mds_rank, command, success_validator)
305
306 def test_scrub_repair(self):
307 mds_rank = 0
308 test_dir = "scrub_repair_path"
309
310 self.mount_a.run_shell(["sudo", "mkdir", test_dir])
311 self.mount_a.run_shell(["sudo", "touch", "{0}/file".format(test_dir)])
312 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))
313
314 self.mount_a.umount_wait()
315
316 # flush journal entries to dirfrag objects, and expire journal
317 self.fs.mds_asok(['flush', 'journal'])
318 self.fs.mds_stop()
319
320 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
321 self.fs.rados(["rmomapkey", dir_objname, "file_head"],
322 pool=self.fs.get_metadata_pool_name())
323
324 self.fs.mds_fail_restart()
325 self.fs.wait_for_daemons()
326
327 self.mount_a.mount_wait()
328
329 # fragstat indicates the directory is not empty, rmdir should fail
330 with self.assertRaises(CommandFailedError) as ar:
331 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
332 self.assertEqual(ar.exception.exitstatus, 1)
333
334 self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir),
335 lambda j, r: self.json_validator(j, r, "return_code", 0))
336
337 # wait a few second for background repair
338 time.sleep(10)
339
340 # fragstat should be fixed
341 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
342
343 @staticmethod
344 def json_validator(json_out, rc, element, expected_value):
345 if rc != 0:
346 return False, "asok command returned error {rc}".format(rc=rc)
347 element_value = json_out.get(element)
348 if element_value != expected_value:
349 return False, "unexpectedly got {jv} instead of {ev}!".format(
350 jv=element_value, ev=expected_value)
351 return True, "Succeeded"
352
353 def tell_command(self, mds_rank, command, validator):
354 log.info("Running command '{command}'".format(command=command))
355
356 command_list = command.split()
357 jout = self.fs.rank_tell(command_list, mds_rank)
358
359 log.info("command '{command}' returned '{jout}'".format(
360 command=command, jout=jout))
361
362 success, errstring = validator(jout, 0)
363 if not success:
364 raise AsokCommandFailedError(command, 0, jout, errstring)
365 return jout
366
367 def asok_command(self, mds_rank, command, validator):
368 log.info("Running command '{command}'".format(command=command))
369
370 command_list = command.split()
371
372 # we just assume there's an active mds for every rank
373 mds_id = self.fs.get_active_names()[mds_rank]
374 proc = self.fs.mon_manager.admin_socket('mds', mds_id,
375 command_list, check_status=False)
376 rout = proc.exitstatus
377 sout = proc.stdout.getvalue()
378
379 if sout.strip():
380 jout = json.loads(sout)
381 else:
382 jout = None
383
384 log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
385 command=command, rout=rout, sout=sout))
386
387 success, errstring = validator(jout, rout)
388
389 if not success:
390 raise AsokCommandFailedError(command, rout, jout, errstring)
391
392 return jout
393
394 @staticmethod
395 def clone_repo(client_mount, path):
396 repo = "ceph-qa-suite"
397 repo_path = os.path.join(path, repo)
398 client_mount.run_shell(["mkdir", "-p", path])
399
400 try:
401 client_mount.stat(repo_path)
402 except CommandFailedError:
403 client_mount.run_shell([
404 "git", "clone", '--branch', 'giant',
405 "http://github.com/ceph/{repo}".format(repo=repo),
406 "{path}/{repo}".format(path=path, repo=repo)
407 ])
408
409 return repo_path
410
411
412 class AsokCommandFailedError(Exception):
413 """
414 Exception thrown when we get an unexpected response
415 on an admin socket command
416 """
417
418 def __init__(self, command, rc, json_out, errstring):
419 self.command = command
420 self.rc = rc
421 self.json = json_out
422 self.errstring = errstring
423
424 def __str__(self):
425 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
426 command=self.command, rc=self.rc, json=self.json, es=self.errstring)