]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/test_data_scan.py
update sources to v12.2.1
[ceph.git] / ceph / qa / tasks / cephfs / test_data_scan.py
CommitLineData
7c673cae
FG
1
2"""
3Test our tools for recovering metadata from the data pool
4"""
5import json
6
7import logging
8import os
9from textwrap import dedent
10import traceback
11from collections import namedtuple, defaultdict
12
13from teuthology.orchestra.run import CommandFailedError
14from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
15
16log = logging.getLogger(__name__)
17
18
19ValidationError = namedtuple("ValidationError", ["exception", "backtrace"])
20
21
22class Workload(object):
23 def __init__(self, filesystem, mount):
24 self._mount = mount
25 self._filesystem = filesystem
26 self._initial_state = None
27
28 # Accumulate backtraces for every failed validation, and return them. Backtraces
29 # are rather verbose, but we only see them when something breaks, and they
30 # let us see which check failed without having to decorate each check with
31 # a string
32 self._errors = []
33
34 def assert_equal(self, a, b):
35 try:
36 if a != b:
37 raise AssertionError("{0} != {1}".format(a, b))
38 except AssertionError as e:
39 self._errors.append(
40 ValidationError(e, traceback.format_exc(3))
41 )
42
43 def write(self):
44 """
45 Write the workload files to the mount
46 """
47 raise NotImplementedError()
48
49 def validate(self):
50 """
51 Read from the mount and validate that the workload files are present (i.e. have
52 survived or been reconstructed from the test scenario)
53 """
54 raise NotImplementedError()
55
56 def damage(self):
57 """
58 Damage the filesystem pools in ways that will be interesting to recover from. By
59 default just wipe everything in the metadata pool
60 """
61 # Delete every object in the metadata pool
62 objects = self._filesystem.rados(["ls"]).split("\n")
63 for o in objects:
64 self._filesystem.rados(["rm", o])
65
66 def flush(self):
67 """
68 Called after client unmount, after write: flush whatever you want
69 """
70 self._filesystem.mds_asok(["flush", "journal"])
71
72
73class SimpleWorkload(Workload):
74 """
75 Single file, single directory, check that it gets recovered and so does its size
76 """
77 def write(self):
78 self._mount.run_shell(["mkdir", "subdir"])
79 self._mount.write_n_mb("subdir/sixmegs", 6)
80 self._initial_state = self._mount.stat("subdir/sixmegs")
81
82 def validate(self):
83 self._mount.run_shell(["ls", "subdir"])
84 st = self._mount.stat("subdir/sixmegs")
85 self.assert_equal(st['st_size'], self._initial_state['st_size'])
86 return self._errors
87
88
89class MovedFile(Workload):
90 def write(self):
91 # Create a file whose backtrace disagrees with his eventual position
92 # in the metadata. We will see that he gets reconstructed in his
93 # original position according to his backtrace.
94 self._mount.run_shell(["mkdir", "subdir_alpha"])
95 self._mount.run_shell(["mkdir", "subdir_bravo"])
96 self._mount.write_n_mb("subdir_alpha/sixmegs", 6)
97 self._filesystem.mds_asok(["flush", "journal"])
98 self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
99 self._initial_state = self._mount.stat("subdir_bravo/sixmegs")
100
101 def flush(self):
102 pass
103
104 def validate(self):
105 self.assert_equal(self._mount.ls(), ["subdir_alpha"])
106 st = self._mount.stat("subdir_alpha/sixmegs")
107 self.assert_equal(st['st_size'], self._initial_state['st_size'])
108 return self._errors
109
110
111class BacktracelessFile(Workload):
112 def write(self):
113 self._mount.run_shell(["mkdir", "subdir"])
114 self._mount.write_n_mb("subdir/sixmegs", 6)
115 self._initial_state = self._mount.stat("subdir/sixmegs")
116
117 def flush(self):
118 # Never flush metadata, so backtrace won't be written
119 pass
120
121 def validate(self):
122 ino_name = "%x" % self._initial_state["st_ino"]
123
124 # The inode should be linked into lost+found because we had no path for it
125 self.assert_equal(self._mount.ls(), ["lost+found"])
126 self.assert_equal(self._mount.ls("lost+found"), [ino_name])
127 st = self._mount.stat("lost+found/{ino_name}".format(ino_name=ino_name))
128
129 # We might not have got the name or path, but we should still get the size
130 self.assert_equal(st['st_size'], self._initial_state['st_size'])
131
132 return self._errors
133
134
135class StripedStashedLayout(Workload):
136 def __init__(self, fs, m):
137 super(StripedStashedLayout, self).__init__(fs, m)
138
139 # Nice small stripes so we can quickly do our writes+validates
140 self.sc = 4
141 self.ss = 65536
142 self.os = 262144
143
144 self.interesting_sizes = [
145 # Exactly stripe_count objects will exist
146 self.os * self.sc,
147 # Fewer than stripe_count objects will exist
148 self.os * self.sc / 2,
149 self.os * (self.sc - 1) + self.os / 2,
150 self.os * (self.sc - 1) + self.os / 2 - 1,
151 self.os * (self.sc + 1) + self.os / 2,
152 self.os * (self.sc + 1) + self.os / 2 + 1,
153 # More than stripe_count objects will exist
154 self.os * self.sc + self.os * self.sc / 2
155 ]
156
157 def write(self):
158 # Create a dir with a striped layout set on it
159 self._mount.run_shell(["mkdir", "stripey"])
160
161 self._mount.setfattr("./stripey", "ceph.dir.layout",
162 "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
163 ss=self.ss, os=self.os, sc=self.sc,
164 pool=self._filesystem.get_data_pool_name()
165 ))
166
167 # Write files, then flush metadata so that its layout gets written into an xattr
168 for i, n_bytes in enumerate(self.interesting_sizes):
169 self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
170 # This is really just validating the validator
171 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
172 self._filesystem.mds_asok(["flush", "journal"])
173
174 # Write another file in the same way, but this time don't flush the metadata,
175 # so that it won't have the layout xattr
176 self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512)
177 self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
178
179 self._initial_state = {
180 "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file")
181 }
182
183 def flush(self):
184 # Pass because we already selectively flushed during write
185 pass
186
187 def validate(self):
188 # The first files should have been recovered into its original location
189 # with the correct layout: read back correct data
190 for i, n_bytes in enumerate(self.interesting_sizes):
191 try:
192 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
193 except CommandFailedError as e:
194 self._errors.append(
195 ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3))
196 )
197
198 # The unflushed file should have been recovered into lost+found without
199 # the correct layout: read back junk
200 ino_name = "%x" % self._initial_state["unflushed_ino"]
201 self.assert_equal(self._mount.ls("lost+found"), [ino_name])
202 try:
203 self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512)
204 except CommandFailedError:
205 pass
206 else:
207 self._errors.append(
208 ValidationError("Unexpectedly valid data in unflushed striped file", "")
209 )
210
211 return self._errors
212
213
214class ManyFilesWorkload(Workload):
215 def __init__(self, filesystem, mount, file_count):
216 super(ManyFilesWorkload, self).__init__(filesystem, mount)
217 self.file_count = file_count
218
219 def write(self):
220 self._mount.run_shell(["mkdir", "subdir"])
221 for n in range(0, self.file_count):
222 self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
223
224 def validate(self):
225 for n in range(0, self.file_count):
226 try:
227 self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
228 except CommandFailedError as e:
229 self._errors.append(
230 ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
231 )
232
233 return self._errors
234
235
236class MovedDir(Workload):
237 def write(self):
238 # Create a nested dir that we will then move. Two files with two different
239 # backtraces referring to the moved dir, claiming two different locations for
240 # it. We will see that only one backtrace wins and the dir ends up with
241 # single linkage.
242 self._mount.run_shell(["mkdir", "-p", "grandmother/parent"])
243 self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1)
244 self._filesystem.mds_asok(["flush", "journal"])
245 self._mount.run_shell(["mkdir", "grandfather"])
246 self._mount.run_shell(["mv", "grandmother/parent", "grandfather"])
247 self._mount.write_n_mb("grandfather/parent/new_pos_file", 2)
248 self._filesystem.mds_asok(["flush", "journal"])
249
250 self._initial_state = (
251 self._mount.stat("grandfather/parent/orig_pos_file"),
252 self._mount.stat("grandfather/parent/new_pos_file")
253 )
254
255 def validate(self):
256 root_files = self._mount.ls()
257 self.assert_equal(len(root_files), 1)
258 self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True)
259 winner = root_files[0]
260 st_opf = self._mount.stat("{0}/parent/orig_pos_file".format(winner))
261 st_npf = self._mount.stat("{0}/parent/new_pos_file".format(winner))
262
263 self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size'])
264 self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size'])
265
266
267class MissingZerothObject(Workload):
268 def write(self):
269 self._mount.run_shell(["mkdir", "subdir"])
270 self._mount.write_n_mb("subdir/sixmegs", 6)
271 self._initial_state = self._mount.stat("subdir/sixmegs")
272
273 def damage(self):
274 super(MissingZerothObject, self).damage()
275 zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino'])
276 self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name())
277
278 def validate(self):
279 st = self._mount.stat("lost+found/{0:x}".format(self._initial_state['st_ino']))
280 self.assert_equal(st['st_size'], self._initial_state['st_size'])
281
282
283class NonDefaultLayout(Workload):
284 """
285 Check that the reconstruction copes with files that have a different
286 object size in their layout
287 """
288 def write(self):
289 self._mount.run_shell(["touch", "datafile"])
290 self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
291 self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
292 self._initial_state = self._mount.stat("datafile")
293
294 def validate(self):
295 # Check we got the layout reconstructed properly
296 object_size = int(self._mount.getfattr(
297 "./datafile", "ceph.file.layout.object_size"))
298 self.assert_equal(object_size, 8388608)
299
300 # Check we got the file size reconstructed properly
301 st = self._mount.stat("datafile")
302 self.assert_equal(st['st_size'], self._initial_state['st_size'])
303
304
305class TestDataScan(CephFSTestCase):
306 MDSS_REQUIRED = 2
307
308 def is_marked_damaged(self, rank):
309 mds_map = self.fs.get_mds_map()
310 return rank in mds_map['damaged']
311
181888fb 312 def _rebuild_metadata(self, workload, workers=1):
7c673cae
FG
313 """
314 That when all objects in metadata pool are removed, we can rebuild a metadata pool
315 based on the contents of a data pool, and a client can see and read our files.
316 """
317
318 # First, inject some files
319
7c673cae
FG
320 workload.write()
321
322 # Unmount the client and flush the journal: the tool should also cope with
323 # situations where there is dirty metadata, but we'll test that separately
324 self.mount_a.umount_wait()
325 workload.flush()
326
7c673cae
FG
327 # Stop the MDS
328 self.fs.mds_stop()
329 self.fs.mds_fail()
330
331 # After recovery, we need the MDS to not be strict about stats (in production these options
332 # are off by default, but in QA we need to explicitly disable them)
333 self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
334 self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
335
336 # Apply any data damage the workload wants
337 workload.damage()
338
339 # Reset the MDS map in case multiple ranks were in play: recovery procedure
340 # only understands how to rebuild metadata under rank 0
341 self.fs.mon_manager.raw_cluster_cmd('fs', 'reset', self.fs.name,
342 '--yes-i-really-mean-it')
343
181888fb 344 self.fs.mds_restart()
7c673cae
FG
345
346 def get_state(mds_id):
347 info = self.mds_cluster.get_mds_info(mds_id)
348 return info['state'] if info is not None else None
349
181888fb
FG
350 self.wait_until_true(lambda: self.is_marked_damaged(0), 60)
351 for mds_id in self.fs.mds_ids:
352 self.wait_until_equal(
353 lambda: get_state(mds_id),
354 "up:standby",
355 timeout=60)
7c673cae
FG
356
357 self.fs.table_tool([self.fs.name + ":0", "reset", "session"])
358 self.fs.table_tool([self.fs.name + ":0", "reset", "snap"])
359 self.fs.table_tool([self.fs.name + ":0", "reset", "inode"])
360
361 # Run the recovery procedure
362 if False:
363 with self.assertRaises(CommandFailedError):
364 # Normal reset should fail when no objects are present, we'll use --force instead
365 self.fs.journal_tool(["journal", "reset"])
366
181888fb
FG
367 self.fs.journal_tool(["journal", "reset", "--force"])
368 self.fs.data_scan(["init"])
369 self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
370 self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
7c673cae
FG
371
372 # Mark the MDS repaired
373 self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
374
375 # Start the MDS
376 self.fs.mds_restart()
377 self.fs.wait_for_daemons()
7c673cae
FG
378 log.info(str(self.mds_cluster.status()))
379
380 # Mount a client
181888fb 381 self.mount_a.mount()
7c673cae
FG
382 self.mount_a.wait_until_mounted()
383
384 # See that the files are present and correct
385 errors = workload.validate()
386 if errors:
387 log.error("Validation errors found: {0}".format(len(errors)))
388 for e in errors:
389 log.error(e.exception)
390 log.error(e.backtrace)
391 raise AssertionError("Validation failed, first error: {0}\n{1}".format(
392 errors[0].exception, errors[0].backtrace
393 ))
394
395 def test_rebuild_simple(self):
396 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
397
398 def test_rebuild_moved_file(self):
399 self._rebuild_metadata(MovedFile(self.fs, self.mount_a))
400
401 def test_rebuild_backtraceless(self):
402 self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a))
403
404 def test_rebuild_moved_dir(self):
405 self._rebuild_metadata(MovedDir(self.fs, self.mount_a))
406
407 def test_rebuild_missing_zeroth(self):
408 self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a))
409
410 def test_rebuild_nondefault_layout(self):
411 self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a))
412
413 def test_stashed_layout(self):
414 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
415
7c673cae 416 def _dirfrag_keys(self, object_id):
7c673cae
FG
417 keys_str = self.fs.rados(["listomapkeys", object_id])
418 if keys_str:
419 return keys_str.split("\n")
420 else:
421 return []
422
423 def test_fragmented_injection(self):
424 """
425 That when injecting a dentry into a fragmented directory, we put it in the right fragment.
426 """
427
428 self.fs.set_allow_dirfrags(True)
429
430 file_count = 100
431 file_names = ["%s" % n for n in range(0, file_count)]
432
433 # Create a directory of `file_count` files, each named after its
434 # decimal number and containing the string of its decimal number
435 self.mount_a.run_python(dedent("""
436 import os
437 path = os.path.join("{path}", "subdir")
438 os.mkdir(path)
439 for n in range(0, {file_count}):
440 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
441 """.format(
442 path=self.mount_a.mountpoint,
443 file_count=file_count
444 )))
445
446 dir_ino = self.mount_a.path_to_ino("subdir")
447
448 # Only one MDS should be active!
449 self.assertEqual(len(self.fs.get_active_names()), 1)
450
451 # Ensure that one directory is fragmented
452 mds_id = self.fs.get_active_names()[0]
453 self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id)
454
455 # Flush journal and stop MDS
456 self.mount_a.umount_wait()
457 self.fs.mds_asok(["flush", "journal"], mds_id)
458 self.fs.mds_stop()
459 self.fs.mds_fail()
460
461 # Pick a dentry and wipe out its key
462 # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
463 frag_obj_id = "{0:x}.01000000".format(dir_ino)
464 keys = self._dirfrag_keys(frag_obj_id)
465 victim_key = keys[7] # arbitrary choice
466 log.info("victim_key={0}".format(victim_key))
467 victim_dentry = victim_key.split("_head")[0]
468 self.fs.rados(["rmomapkey", frag_obj_id, victim_key])
469
470 # Start filesystem back up, observe that the file appears to be gone in an `ls`
471 self.fs.mds_restart()
472 self.fs.wait_for_daemons()
473 self.mount_a.mount()
474 self.mount_a.wait_until_mounted()
475 files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n")
476 self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry]))))
477
478 # Stop the filesystem
479 self.mount_a.umount_wait()
480 self.fs.mds_stop()
481 self.fs.mds_fail()
482
483 # Run data-scan, observe that it inserts our dentry back into the correct fragment
484 # by checking the omap now has the dentry's key again
485 self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
486 self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
487 self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id))
488
489 # Start the filesystem and check that the dentry we deleted is now once again visible
490 # and points to the correct file data.
491 self.fs.mds_restart()
492 self.fs.wait_for_daemons()
493 self.mount_a.mount()
494 self.mount_a.wait_until_mounted()
495 out = self.mount_a.run_shell(["cat", "subdir/{0}".format(victim_dentry)]).stdout.getvalue().strip()
496 self.assertEqual(out, victim_dentry)
497
498 # Finally, close the loop by checking our injected dentry survives a merge
499 mds_id = self.fs.get_active_names()[0]
500 self.mount_a.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work
501 self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id)
502 self.fs.mds_asok(["flush", "journal"], mds_id)
503 frag_obj_id = "{0:x}.00000000".format(dir_ino)
504 keys = self._dirfrag_keys(frag_obj_id)
505 self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
506
507 @for_teuthology
508 def test_parallel_execution(self):
509 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
510
511 def test_pg_files(self):
512 """
513 That the pg files command tells us which files are associated with
514 a particular PG
515 """
516 file_count = 20
517 self.mount_a.run_shell(["mkdir", "mydir"])
518 self.mount_a.create_n_files("mydir/myfile", file_count)
519
520 # Some files elsewhere in the system that we will ignore
521 # to check that the tool is filtering properly
522 self.mount_a.run_shell(["mkdir", "otherdir"])
523 self.mount_a.create_n_files("otherdir/otherfile", file_count)
524
525 pgs_to_files = defaultdict(list)
526 # Rough (slow) reimplementation of the logic
527 for i in range(0, file_count):
528 file_path = "mydir/myfile_{0}".format(i)
529 ino = self.mount_a.path_to_ino(file_path)
530 obj = "{0:x}.{1:08x}".format(ino, 0)
531 pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd(
532 "osd", "map", self.fs.get_data_pool_name(), obj,
533 "--format=json-pretty"
534 ))['pgid']
535 pgs_to_files[pgid].append(file_path)
536 log.info("{0}: {1}".format(file_path, pgid))
537
538 pg_count = self.fs.get_pgs_per_fs_pool()
539 for pg_n in range(0, pg_count):
540 pg_str = "{0}.{1}".format(self.fs.get_data_pool_id(), pg_n)
541 out = self.fs.data_scan(["pg_files", "mydir", pg_str])
542 lines = [l for l in out.split("\n") if l]
543 log.info("{0}: {1}".format(pg_str, lines))
544 self.assertSetEqual(set(lines), set(pgs_to_files[pg_str]))
545
546 def test_scan_links(self):
547 """
548 The scan_links command fixes linkage errors
549 """
550 self.mount_a.run_shell(["mkdir", "testdir1"])
551 self.mount_a.run_shell(["mkdir", "testdir2"])
552 dir1_ino = self.mount_a.path_to_ino("testdir1")
553 dir2_ino = self.mount_a.path_to_ino("testdir2")
554 dirfrag1_oid = "{0:x}.00000000".format(dir1_ino)
555 dirfrag2_oid = "{0:x}.00000000".format(dir2_ino)
556
557 self.mount_a.run_shell(["touch", "testdir1/file1"])
558 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
559 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
560
561 mds_id = self.fs.get_active_names()[0]
562 self.fs.mds_asok(["flush", "journal"], mds_id)
563
564 dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid)
565
566 # introduce duplicated primary link
567 file1_key = "file1_head"
568 self.assertIn(file1_key, dirfrag1_keys)
569 file1_omap_data = self.fs.rados(["getomapval", dirfrag1_oid, file1_key, '-'])
570 self.fs.rados(["setomapval", dirfrag2_oid, file1_key], stdin_data=file1_omap_data)
571 self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
572
573 # remove a remote link, make inode link count incorrect
574 link1_key = 'link1_head'
575 self.assertIn(link1_key, dirfrag1_keys)
576 self.fs.rados(["rmomapkey", dirfrag1_oid, link1_key])
577
578 # increase good primary link's version
579 self.mount_a.run_shell(["touch", "testdir1/file1"])
580 self.mount_a.umount_wait()
581
582 self.fs.mds_asok(["flush", "journal"], mds_id)
583 self.fs.mds_stop()
584 self.fs.mds_fail()
585
586 # repair linkage errors
587 self.fs.data_scan(["scan_links"])
588
589 # primary link in testdir2 was deleted?
590 self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
591
592 self.fs.mds_restart()
593 self.fs.wait_for_daemons()
594
595 self.mount_a.mount()
596 self.mount_a.wait_until_mounted()
597
598 # link count was adjusted?
599 file1_nlink = self.mount_a.path_to_nlink("testdir1/file1")
600 self.assertEqual(file1_nlink, 2)