]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_scrub_checks.py
012b6c009fda2884a278074bdac9dc8e3432359f
[ceph.git] / ceph / qa / tasks / cephfs / test_scrub_checks.py
1 """
2 MDS admin socket scrubbing-related tests.
3 """
4 import json
5 import logging
6 import errno
7 import time
8 from teuthology.exceptions import CommandFailedError
9 import os
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase
11
12 log = logging.getLogger(__name__)
13
14 class TestScrubControls(CephFSTestCase):
15 """
16 Test basic scrub control operations such as abort, pause and resume.
17 """
18
19 MDSS_REQUIRED = 2
20 CLIENTS_REQUIRED = 1
21
22 def _abort_scrub(self, expected):
23 res = self.fs.rank_tell(["scrub", "abort"])
24 self.assertEqual(res['return_code'], expected)
25 def _pause_scrub(self, expected):
26 res = self.fs.rank_tell(["scrub", "pause"])
27 self.assertEqual(res['return_code'], expected)
28 def _resume_scrub(self, expected):
29 res = self.fs.rank_tell(["scrub", "resume"])
30 self.assertEqual(res['return_code'], expected)
31 def _get_scrub_status(self):
32 return self.fs.rank_tell(["scrub", "status"])
33 def _check_task_status(self, expected_status):
34 task_status = self.fs.get_task_status("scrub status")
35 active = self.fs.get_active_names()
36 log.debug("current active={0}".format(active))
37 self.assertTrue(task_status[active[0]].startswith(expected_status))
38
39 def test_scrub_abort(self):
40 test_dir = "scrub_control_test_path"
41 abs_test_path = "/{0}".format(test_dir)
42
43 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
44 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
45 log.info("client_path: {0}".format(client_path))
46
47 log.info("Cloning repo into place")
48 TestScrubChecks.clone_repo(self.mount_a, client_path)
49
50 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
51 self.assertNotEqual(out_json, None)
52
53 # abort and verify
54 self._abort_scrub(0)
55 out_json = self._get_scrub_status()
56 self.assertTrue("no active" in out_json['status'])
57
58 # sleep enough to fetch updated task status
59 time.sleep(10)
60 self._check_task_status("idle")
61
62 def test_scrub_pause_and_resume(self):
63 test_dir = "scrub_control_test_path"
64 abs_test_path = "/{0}".format(test_dir)
65
66 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
67 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
68 log.info("client_path: {0}".format(client_path))
69
70 log.info("Cloning repo into place")
71 _ = TestScrubChecks.clone_repo(self.mount_a, client_path)
72
73 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
74 self.assertNotEqual(out_json, None)
75
76 # pause and verify
77 self._pause_scrub(0)
78 out_json = self._get_scrub_status()
79 self.assertTrue("PAUSED" in out_json['status'])
80
81 # sleep enough to fetch updated task status
82 time.sleep(10)
83 self._check_task_status("paused")
84
85 # resume and verify
86 self._resume_scrub(0)
87 out_json = self._get_scrub_status()
88 self.assertFalse("PAUSED" in out_json['status'])
89
90 def test_scrub_pause_and_resume_with_abort(self):
91 test_dir = "scrub_control_test_path"
92 abs_test_path = "/{0}".format(test_dir)
93
94 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
95 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
96 log.info("client_path: {0}".format(client_path))
97
98 log.info("Cloning repo into place")
99 _ = TestScrubChecks.clone_repo(self.mount_a, client_path)
100
101 out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
102 self.assertNotEqual(out_json, None)
103
104 # pause and verify
105 self._pause_scrub(0)
106 out_json = self._get_scrub_status()
107 self.assertTrue("PAUSED" in out_json['status'])
108
109 # sleep enough to fetch updated task status
110 time.sleep(10)
111 self._check_task_status("paused")
112
113 # abort and verify
114 self._abort_scrub(0)
115 out_json = self._get_scrub_status()
116 self.assertTrue("PAUSED" in out_json['status'])
117 self.assertTrue("0 inodes" in out_json['status'])
118
119 # sleep enough to fetch updated task status
120 time.sleep(10)
121 self._check_task_status("paused")
122
123 # resume and verify
124 self._resume_scrub(0)
125 out_json = self._get_scrub_status()
126 self.assertTrue("no active" in out_json['status'])
127
128 # sleep enough to fetch updated task status
129 time.sleep(10)
130 self._check_task_status("idle")
131
132 def test_scrub_task_status_on_mds_failover(self):
133 # sleep enough to fetch updated task status
134 time.sleep(10)
135
136 (original_active, ) = self.fs.get_active_names()
137 original_standbys = self.mds_cluster.get_standby_daemons()
138 self._check_task_status("idle")
139
140 # Kill the rank 0
141 self.fs.mds_stop(original_active)
142
143 grace = float(self.fs.get_config("mds_beacon_grace", service_type="mon"))
144
145 def promoted():
146 active = self.fs.get_active_names()
147 return active and active[0] in original_standbys
148
149 log.info("Waiting for promotion of one of the original standbys {0}".format(
150 original_standbys))
151 self.wait_until_true(promoted, timeout=grace*2)
152
153 mgr_beacon_grace = float(self.fs.get_config("mgr_service_beacon_grace", service_type="mon"))
154
155 def status_check():
156 task_status = self.fs.get_task_status("scrub status")
157 return original_active not in task_status
158 self.wait_until_true(status_check, timeout=mgr_beacon_grace*2)
159
160 class TestScrubChecks(CephFSTestCase):
161 """
162 Run flush and scrub commands on the specified files in the filesystem. This
163 task will run through a sequence of operations, but it is not comprehensive
164 on its own -- it doesn't manipulate the mds cache state to test on both
165 in- and out-of-memory parts of the hierarchy. So it's designed to be run
166 multiple times within a single test run, so that the test can manipulate
167 memory state.
168
169 Usage:
170 mds_scrub_checks:
171 mds_rank: 0
172 path: path/to/test/dir
173 client: 0
174 run_seq: [0-9]+
175
176 Increment the run_seq on subsequent invocations within a single test run;
177 it uses that value to generate unique folder and file names.
178 """
179
180 MDSS_REQUIRED = 1
181 CLIENTS_REQUIRED = 1
182
183 def test_scrub_checks(self):
184 self._checks(0)
185 self._checks(1)
186
187 def _checks(self, run_seq):
188 mds_rank = 0
189 test_dir = "scrub_test_path"
190
191 abs_test_path = "/{0}".format(test_dir)
192
193 log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
194 client_path = os.path.join(self.mount_a.mountpoint, test_dir)
195 log.info("client_path: {0}".format(client_path))
196
197 log.info("Cloning repo into place")
198 repo_path = TestScrubChecks.clone_repo(self.mount_a, client_path)
199
200 log.info("Initiating mds_scrub_checks on mds.{id_} test_path {path}, run_seq {seq}".format(
201 id_=mds_rank, path=abs_test_path, seq=run_seq)
202 )
203
204
205 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)
206
207 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)
208 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),
209 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
210 self.tell_command(mds_rank, "scrub start {nep}".format(nep=nep),
211 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
212
213 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)
214 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)
215
216 if run_seq == 0:
217 log.info("First run: flushing {dirpath}".format(dirpath=dirpath))
218 command = "flush_path {dirpath}".format(dirpath=dirpath)
219 self.asok_command(mds_rank, command, success_validator)
220 command = "scrub start {dirpath}".format(dirpath=dirpath)
221 self.tell_command(mds_rank, command, success_validator)
222
223 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(
224 repo_path=test_repo_path)
225 if run_seq == 0:
226 log.info("First run: flushing {filepath}".format(filepath=filepath))
227 command = "flush_path {filepath}".format(filepath=filepath)
228 self.asok_command(mds_rank, command, success_validator)
229 command = "scrub start {filepath}".format(filepath=filepath)
230 self.tell_command(mds_rank, command, success_validator)
231
232 filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \
233 format(repo_path=test_repo_path)
234 command = "scrub start {filepath}".format(filepath=filepath)
235 self.tell_command(mds_rank, command,
236 lambda j, r: self.json_validator(j, r, "performed_validation",
237 False))
238
239 if run_seq == 0:
240 log.info("First run: flushing base dir /")
241 command = "flush_path /"
242 self.asok_command(mds_rank, command, success_validator)
243 command = "scrub start /"
244 self.tell_command(mds_rank, command, success_validator)
245
246 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)
247 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,
248 i=run_seq)
249 self.mount_a.run_shell(["mkdir", new_dir])
250 command = "flush_path {dir}".format(dir=test_new_dir)
251 self.asok_command(mds_rank, command, success_validator)
252
253 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,
254 i=run_seq)
255 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,
256 i=run_seq)
257 self.mount_a.write_n_mb(new_file, 1)
258
259 command = "flush_path {file}".format(file=test_new_file)
260 self.asok_command(mds_rank, command, success_validator)
261
262 # check that scrub fails on errors
263 ino = self.mount_a.path_to_ino(new_file)
264 rados_obj_name = "{ino:x}.00000000".format(ino=ino)
265 command = "scrub start {file}".format(file=test_new_file)
266
267 # Missing parent xattr -> ENODATA
268 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())
269 self.tell_command(mds_rank, command,
270 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA))
271
272 # Missing object -> ENOENT
273 self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name())
274 self.tell_command(mds_rank, command,
275 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))
276
277 command = "flush_path /"
278 self.asok_command(mds_rank, command, success_validator)
279
280 def test_scrub_repair(self):
281 mds_rank = 0
282 test_dir = "scrub_repair_path"
283
284 self.mount_a.run_shell(["sudo", "mkdir", test_dir])
285 self.mount_a.run_shell(["sudo", "touch", "{0}/file".format(test_dir)])
286 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))
287
288 self.mount_a.umount_wait()
289
290 # flush journal entries to dirfrag objects, and expire journal
291 self.fs.mds_asok(['flush', 'journal'])
292 self.fs.mds_stop()
293
294 # remove the dentry from dirfrag, cause incorrect fragstat/rstat
295 self.fs.rados(["rmomapkey", dir_objname, "file_head"],
296 pool=self.fs.get_metadata_pool_name())
297
298 self.fs.mds_fail_restart()
299 self.fs.wait_for_daemons()
300
301 self.mount_a.mount()
302 self.mount_a.wait_until_mounted()
303
304 # fragstat indicates the directory is not empty, rmdir should fail
305 with self.assertRaises(CommandFailedError) as ar:
306 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
307 self.assertEqual(ar.exception.exitstatus, 1)
308
309 self.tell_command(mds_rank, "scrub start /{0} repair".format(test_dir),
310 lambda j, r: self.json_validator(j, r, "return_code", 0))
311
312 # wait a few second for background repair
313 time.sleep(10)
314
315 # fragstat should be fixed
316 self.mount_a.run_shell(["sudo", "rmdir", test_dir])
317
318 @staticmethod
319 def json_validator(json_out, rc, element, expected_value):
320 if rc != 0:
321 return False, "asok command returned error {rc}".format(rc=rc)
322 element_value = json_out.get(element)
323 if element_value != expected_value:
324 return False, "unexpectedly got {jv} instead of {ev}!".format(
325 jv=element_value, ev=expected_value)
326 return True, "Succeeded"
327
328 def tell_command(self, mds_rank, command, validator):
329 log.info("Running command '{command}'".format(command=command))
330
331 command_list = command.split()
332 jout = self.fs.rank_tell(command_list, mds_rank)
333
334 log.info("command '{command}' returned '{jout}'".format(
335 command=command, jout=jout))
336
337 success, errstring = validator(jout, 0)
338 if not success:
339 raise AsokCommandFailedError(command, 0, jout, errstring)
340 return jout
341
342 def asok_command(self, mds_rank, command, validator):
343 log.info("Running command '{command}'".format(command=command))
344
345 command_list = command.split()
346
347 # we just assume there's an active mds for every rank
348 mds_id = self.fs.get_active_names()[mds_rank]
349 proc = self.fs.mon_manager.admin_socket('mds', mds_id,
350 command_list, check_status=False)
351 rout = proc.exitstatus
352 sout = proc.stdout.getvalue()
353
354 if sout.strip():
355 jout = json.loads(sout)
356 else:
357 jout = None
358
359 log.info("command '{command}' got response code '{rout}' and stdout '{sout}'".format(
360 command=command, rout=rout, sout=sout))
361
362 success, errstring = validator(jout, rout)
363
364 if not success:
365 raise AsokCommandFailedError(command, rout, jout, errstring)
366
367 return jout
368
369 @staticmethod
370 def clone_repo(client_mount, path):
371 repo = "ceph-qa-suite"
372 repo_path = os.path.join(path, repo)
373 client_mount.run_shell(["mkdir", "-p", path])
374
375 try:
376 client_mount.stat(repo_path)
377 except CommandFailedError:
378 client_mount.run_shell([
379 "git", "clone", '--branch', 'giant',
380 "http://github.com/ceph/{repo}".format(repo=repo),
381 "{path}/{repo}".format(path=path, repo=repo)
382 ])
383
384 return repo_path
385
386
387 class AsokCommandFailedError(Exception):
388 """
389 Exception thrown when we get an unexpected response
390 on an admin socket command
391 """
392
393 def __init__(self, command, rc, json_out, errstring):
394 self.command = command
395 self.rc = rc
396 self.json = json_out
397 self.errstring = errstring
398
399 def __str__(self):
400 return "Admin socket: {command} failed with rc={rc} json output={json}, because '{es}'".format(
401 command=self.command, rc=self.rc, json=self.json, es=self.errstring)