]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_data_scan.py
compile with GCC 12 not 11
[ceph.git] / ceph / qa / tasks / cephfs / test_data_scan.py
1
2 """
3 Test our tools for recovering metadata from the data pool
4 """
5 import json
6
7 import logging
8 import os
9 import time
10 import traceback
11 import stat
12
13 from io import BytesIO, StringIO
14 from collections import namedtuple, defaultdict
15 from textwrap import dedent
16
17 from teuthology.exceptions import CommandFailedError
18 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
19
20 log = logging.getLogger(__name__)
21
22
23 ValidationError = namedtuple("ValidationError", ["exception", "backtrace"])
24
25
26 class Workload(object):
27 def __init__(self, filesystem, mount):
28 self._mount = mount
29 self._filesystem = filesystem
30 self._initial_state = None
31
32 # Accumulate backtraces for every failed validation, and return them. Backtraces
33 # are rather verbose, but we only see them when something breaks, and they
34 # let us see which check failed without having to decorate each check with
35 # a string
36 self._errors = []
37
38 def assert_equal(self, a, b):
39 try:
40 if a != b:
41 raise AssertionError("{0} != {1}".format(a, b))
42 except AssertionError as e:
43 self._errors.append(
44 ValidationError(e, traceback.format_exc(3))
45 )
46
47 def assert_not_equal(self, a, b):
48 try:
49 if a == b:
50 raise AssertionError("{0} == {1}".format(a, b))
51 except AssertionError as e:
52 self._errors.append(
53 ValidationError(e, traceback.format_exc(3))
54 )
55
56 def assert_true(self, a):
57 try:
58 if not a:
59 raise AssertionError("{0} is not true".format(a))
60 except AssertionError as e:
61 self._errors.append(
62 ValidationError(e, traceback.format_exc(3))
63 )
64
65 def write(self):
66 """
67 Write the workload files to the mount
68 """
69 raise NotImplementedError()
70
71 def validate(self):
72 """
73 Read from the mount and validate that the workload files are present (i.e. have
74 survived or been reconstructed from the test scenario)
75 """
76 raise NotImplementedError()
77
78 def damage(self):
79 """
80 Damage the filesystem pools in ways that will be interesting to recover from. By
81 default just wipe everything in the metadata pool
82 """
83 # Delete every object in the metadata pool
84 pool = self._filesystem.get_metadata_pool_name()
85 self._filesystem.rados(["purge", pool, '--yes-i-really-really-mean-it'])
86
87 def flush(self):
88 """
89 Called after client unmount, after write: flush whatever you want
90 """
91 self._filesystem.mds_asok(["flush", "journal"])
92
93 def scrub(self):
94 """
95 Called as a final step post recovery before verification. Right now, this
96 doesn't bother if errors are found in scrub - just that the MDS doesn't
97 crash and burn during scrub.
98 """
99 out_json = self._filesystem.run_scrub(["start", "/", "repair,recursive"])
100 self.assert_not_equal(out_json, None)
101 self.assert_equal(out_json["return_code"], 0)
102 self.assert_equal(self._filesystem.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
103
104 class SimpleWorkload(Workload):
105 """
106 Single file, single directory, check that it gets recovered and so does its size
107 """
108 def write(self):
109 self._mount.run_shell(["mkdir", "subdir"])
110 self._mount.write_n_mb("subdir/sixmegs", 6)
111 self._initial_state = self._mount.stat("subdir/sixmegs")
112
113 def validate(self):
114 self._mount.run_shell(["sudo", "ls", "subdir"], omit_sudo=False)
115 st = self._mount.stat("subdir/sixmegs", sudo=True)
116 self.assert_equal(st['st_size'], self._initial_state['st_size'])
117 return self._errors
118
119
120 class SymlinkWorkload(Workload):
121 """
122 Symlink file, check that it gets recovered as symlink
123 """
124 def write(self):
125 self._mount.run_shell(["mkdir", "symdir"])
126 self._mount.write_n_mb("symdir/onemegs", 1)
127 self._mount.run_shell(["ln", "-s", "onemegs", "symdir/symlink_onemegs"])
128 self._mount.run_shell(["ln", "-s", "symdir/onemegs", "symlink1_onemegs"])
129
130 def validate(self):
131 self._mount.run_shell(["sudo", "ls", "symdir"], omit_sudo=False)
132 st = self._mount.lstat("symdir/symlink_onemegs")
133 self.assert_true(stat.S_ISLNK(st['st_mode']))
134 target = self._mount.readlink("symdir/symlink_onemegs")
135 self.assert_equal(target, "onemegs")
136
137 st = self._mount.lstat("symlink1_onemegs")
138 self.assert_true(stat.S_ISLNK(st['st_mode']))
139 target = self._mount.readlink("symlink1_onemegs")
140 self.assert_equal(target, "symdir/onemegs")
141 return self._errors
142
143
144 class MovedFile(Workload):
145 def write(self):
146 # Create a file whose backtrace disagrees with his eventual position
147 # in the metadata. We will see that he gets reconstructed in his
148 # original position according to his backtrace.
149 self._mount.run_shell(["mkdir", "subdir_alpha"])
150 self._mount.run_shell(["mkdir", "subdir_bravo"])
151 self._mount.write_n_mb("subdir_alpha/sixmegs", 6)
152 self._filesystem.mds_asok(["flush", "journal"])
153 self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
154 self._initial_state = self._mount.stat("subdir_bravo/sixmegs")
155
156 def flush(self):
157 pass
158
159 def validate(self):
160 self.assert_equal(self._mount.ls(sudo=True), ["subdir_alpha"])
161 st = self._mount.stat("subdir_alpha/sixmegs", sudo=True)
162 self.assert_equal(st['st_size'], self._initial_state['st_size'])
163 return self._errors
164
165
166 class BacktracelessFile(Workload):
167 def write(self):
168 self._mount.run_shell(["mkdir", "subdir"])
169 self._mount.write_n_mb("subdir/sixmegs", 6)
170 self._initial_state = self._mount.stat("subdir/sixmegs")
171
172 def flush(self):
173 # Never flush metadata, so backtrace won't be written
174 pass
175
176 def validate(self):
177 ino_name = "%x" % self._initial_state["st_ino"]
178
179 # The inode should be linked into lost+found because we had no path for it
180 self.assert_equal(self._mount.ls(sudo=True), ["lost+found"])
181 self.assert_equal(self._mount.ls("lost+found", sudo=True), [ino_name])
182 st = self._mount.stat(f"lost+found/{ino_name}", sudo=True)
183
184 # We might not have got the name or path, but we should still get the size
185 self.assert_equal(st['st_size'], self._initial_state['st_size'])
186
187 return self._errors
188
189
190 class StripedStashedLayout(Workload):
191 def __init__(self, fs, m, pool=None):
192 super(StripedStashedLayout, self).__init__(fs, m)
193
194 # Nice small stripes so we can quickly do our writes+validates
195 self.sc = 4
196 self.ss = 65536
197 self.os = 262144
198 self.pool = pool and pool or self._filesystem.get_data_pool_name()
199
200 self.interesting_sizes = [
201 # Exactly stripe_count objects will exist
202 self.os * self.sc,
203 # Fewer than stripe_count objects will exist
204 self.os * self.sc // 2,
205 self.os * (self.sc - 1) + self.os // 2,
206 self.os * (self.sc - 1) + self.os // 2 - 1,
207 self.os * (self.sc + 1) + self.os // 2,
208 self.os * (self.sc + 1) + self.os // 2 + 1,
209 # More than stripe_count objects will exist
210 self.os * self.sc + self.os * self.sc // 2
211 ]
212
213 def write(self):
214 # Create a dir with a striped layout set on it
215 self._mount.run_shell(["mkdir", "stripey"])
216
217 self._mount.setfattr("./stripey", "ceph.dir.layout",
218 "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
219 ss=self.ss, os=self.os, sc=self.sc, pool=self.pool
220 ))
221
222 # Write files, then flush metadata so that its layout gets written into an xattr
223 for i, n_bytes in enumerate(self.interesting_sizes):
224 self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
225 # This is really just validating the validator
226 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
227 self._filesystem.mds_asok(["flush", "journal"])
228
229 # Write another file in the same way, but this time don't flush the metadata,
230 # so that it won't have the layout xattr
231 self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512)
232 self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
233
234 self._initial_state = {
235 "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file")
236 }
237
238 def flush(self):
239 # Pass because we already selectively flushed during write
240 pass
241
242 def validate(self):
243 # The first files should have been recovered into its original location
244 # with the correct layout: read back correct data
245 for i, n_bytes in enumerate(self.interesting_sizes):
246 try:
247 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
248 except CommandFailedError as e:
249 self._errors.append(
250 ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3))
251 )
252
253 # The unflushed file should have been recovered into lost+found without
254 # the correct layout: read back junk
255 ino_name = "%x" % self._initial_state["unflushed_ino"]
256 self.assert_equal(self._mount.ls("lost+found", sudo=True), [ino_name])
257 try:
258 self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512)
259 except CommandFailedError:
260 pass
261 else:
262 self._errors.append(
263 ValidationError("Unexpectedly valid data in unflushed striped file", "")
264 )
265
266 return self._errors
267
268
269 class ManyFilesWorkload(Workload):
270 def __init__(self, filesystem, mount, file_count):
271 super(ManyFilesWorkload, self).__init__(filesystem, mount)
272 self.file_count = file_count
273
274 def write(self):
275 self._mount.run_shell(["mkdir", "subdir"])
276 for n in range(0, self.file_count):
277 self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
278
279 def validate(self):
280 for n in range(0, self.file_count):
281 try:
282 self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
283 except CommandFailedError as e:
284 self._errors.append(
285 ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
286 )
287
288 return self._errors
289
290
291 class MovedDir(Workload):
292 def write(self):
293 # Create a nested dir that we will then move. Two files with two different
294 # backtraces referring to the moved dir, claiming two different locations for
295 # it. We will see that only one backtrace wins and the dir ends up with
296 # single linkage.
297 self._mount.run_shell(["mkdir", "-p", "grandmother/parent"])
298 self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1)
299 self._filesystem.mds_asok(["flush", "journal"])
300 self._mount.run_shell(["mkdir", "grandfather"])
301 self._mount.run_shell(["mv", "grandmother/parent", "grandfather"])
302 self._mount.write_n_mb("grandfather/parent/new_pos_file", 2)
303 self._filesystem.mds_asok(["flush", "journal"])
304
305 self._initial_state = (
306 self._mount.stat("grandfather/parent/orig_pos_file"),
307 self._mount.stat("grandfather/parent/new_pos_file")
308 )
309
310 def validate(self):
311 root_files = self._mount.ls()
312 self.assert_equal(len(root_files), 1)
313 self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True)
314 winner = root_files[0]
315 st_opf = self._mount.stat(f"{winner}/parent/orig_pos_file", sudo=True)
316 st_npf = self._mount.stat(f"{winner}/parent/new_pos_file", sudo=True)
317
318 self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size'])
319 self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size'])
320
321
322 class MissingZerothObject(Workload):
323 def write(self):
324 self._mount.run_shell(["mkdir", "subdir"])
325 self._mount.write_n_mb("subdir/sixmegs", 6)
326 self._initial_state = self._mount.stat("subdir/sixmegs")
327
328 def damage(self):
329 super(MissingZerothObject, self).damage()
330 zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino'])
331 self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name())
332
333 def validate(self):
334 ino = self._initial_state['st_ino']
335 st = self._mount.stat(f"lost+found/{ino:x}", sudo=True)
336 self.assert_equal(st['st_size'], self._initial_state['st_size'])
337
338
339 class NonDefaultLayout(Workload):
340 """
341 Check that the reconstruction copes with files that have a different
342 object size in their layout
343 """
344 def write(self):
345 self._mount.run_shell(["touch", "datafile"])
346 self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
347 self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
348 self._initial_state = self._mount.stat("datafile")
349
350 def validate(self):
351 # Check we got the layout reconstructed properly
352 object_size = int(self._mount.getfattr("./datafile", "ceph.file.layout.object_size", sudo=True))
353 self.assert_equal(object_size, 8388608)
354
355 # Check we got the file size reconstructed properly
356 st = self._mount.stat("datafile", sudo=True)
357 self.assert_equal(st['st_size'], self._initial_state['st_size'])
358
359
360 class TestDataScan(CephFSTestCase):
361 MDSS_REQUIRED = 2
362
363 def is_marked_damaged(self, rank):
364 mds_map = self.fs.get_mds_map()
365 return rank in mds_map['damaged']
366
367 def _rebuild_metadata(self, workload, workers=1):
368 """
369 That when all objects in metadata pool are removed, we can rebuild a metadata pool
370 based on the contents of a data pool, and a client can see and read our files.
371 """
372
373 # First, inject some files
374
375 workload.write()
376
377 # Unmount the client and flush the journal: the tool should also cope with
378 # situations where there is dirty metadata, but we'll test that separately
379 self.mount_a.umount_wait()
380 workload.flush()
381
382 # Stop the MDS
383 self.fs.fail()
384
385 # After recovery, we need the MDS to not be strict about stats (in production these options
386 # are off by default, but in QA we need to explicitly disable them)
387 self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
388 self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
389
390 # Apply any data damage the workload wants
391 workload.damage()
392
393 # Reset the MDS map in case multiple ranks were in play: recovery procedure
394 # only understands how to rebuild metadata under rank 0
395 self.fs.reset()
396
397 self.fs.set_joinable() # redundant with reset
398
399 def get_state(mds_id):
400 info = self.mds_cluster.get_mds_info(mds_id)
401 return info['state'] if info is not None else None
402
403 self.wait_until_true(lambda: self.is_marked_damaged(0), 60)
404 for mds_id in self.fs.mds_ids:
405 self.wait_until_equal(
406 lambda: get_state(mds_id),
407 "up:standby",
408 timeout=60)
409
410 self.fs.table_tool([self.fs.name + ":0", "reset", "session"])
411 self.fs.table_tool([self.fs.name + ":0", "reset", "snap"])
412 self.fs.table_tool([self.fs.name + ":0", "reset", "inode"])
413
414 # Run the recovery procedure
415 if False:
416 with self.assertRaises(CommandFailedError):
417 # Normal reset should fail when no objects are present, we'll use --force instead
418 self.fs.journal_tool(["journal", "reset"], 0)
419
420 self.fs.journal_tool(["journal", "reset", "--force"], 0)
421 self.fs.data_scan(["init"])
422 self.fs.data_scan(["scan_extents"], worker_count=workers)
423 self.fs.data_scan(["scan_inodes"], worker_count=workers)
424 self.fs.data_scan(["scan_links"])
425
426 # Mark the MDS repaired
427 self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
428
429 # Start the MDS
430 self.fs.mds_restart()
431 self.fs.wait_for_daemons()
432 log.info(str(self.mds_cluster.status()))
433
434 # Mount a client
435 self.mount_a.mount_wait()
436
437 # run scrub as it is recommended post recovery for most
438 # (if not all) recovery mechanisms.
439 workload.scrub()
440
441 # See that the files are present and correct
442 errors = workload.validate()
443 if errors:
444 log.error("Validation errors found: {0}".format(len(errors)))
445 for e in errors:
446 log.error(e.exception)
447 log.error(e.backtrace)
448 raise AssertionError("Validation failed, first error: {0}\n{1}".format(
449 errors[0].exception, errors[0].backtrace
450 ))
451
452 def test_rebuild_simple(self):
453 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
454
455 def test_rebuild_symlink(self):
456 self._rebuild_metadata(SymlinkWorkload(self.fs, self.mount_a))
457
458 def test_rebuild_moved_file(self):
459 self._rebuild_metadata(MovedFile(self.fs, self.mount_a))
460
461 def test_rebuild_backtraceless(self):
462 self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a))
463
464 def test_rebuild_moved_dir(self):
465 self._rebuild_metadata(MovedDir(self.fs, self.mount_a))
466
467 def test_rebuild_missing_zeroth(self):
468 self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a))
469
470 def test_rebuild_nondefault_layout(self):
471 self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a))
472
473 def test_stashed_layout(self):
474 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
475
476 def _dirfrag_keys(self, object_id):
477 keys_str = self.fs.radosmo(["listomapkeys", object_id], stdout=StringIO())
478 if keys_str:
479 return keys_str.strip().split("\n")
480 else:
481 return []
482
483 def test_fragmented_injection(self):
484 """
485 That when injecting a dentry into a fragmented directory, we put it in the right fragment.
486 """
487
488 file_count = 100
489 file_names = ["%s" % n for n in range(0, file_count)]
490
491 # Make sure and disable dirfrag auto merging and splitting
492 self.fs.set_ceph_conf('mds', 'mds bal merge size', 0)
493 self.fs.set_ceph_conf('mds', 'mds bal split size', 100 * file_count)
494
495 # Create a directory of `file_count` files, each named after its
496 # decimal number and containing the string of its decimal number
497 self.mount_a.run_python(dedent("""
498 import os
499 path = os.path.join("{path}", "subdir")
500 os.mkdir(path)
501 for n in range(0, {file_count}):
502 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
503 """.format(
504 path=self.mount_a.mountpoint,
505 file_count=file_count
506 )))
507
508 dir_ino = self.mount_a.path_to_ino("subdir")
509
510 # Only one MDS should be active!
511 self.assertEqual(len(self.fs.get_active_names()), 1)
512
513 # Ensure that one directory is fragmented
514 mds_id = self.fs.get_active_names()[0]
515 self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id)
516
517 # Flush journal and stop MDS
518 self.mount_a.umount_wait()
519 self.fs.mds_asok(["flush", "journal"], mds_id)
520 self.fs.fail()
521
522 # Pick a dentry and wipe out its key
523 # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
524 frag_obj_id = "{0:x}.01000000".format(dir_ino)
525 keys = self._dirfrag_keys(frag_obj_id)
526 victim_key = keys[7] # arbitrary choice
527 log.info("victim_key={0}".format(victim_key))
528 victim_dentry = victim_key.split("_head")[0]
529 self.fs.radosm(["rmomapkey", frag_obj_id, victim_key])
530
531 # Start filesystem back up, observe that the file appears to be gone in an `ls`
532 self.fs.set_joinable()
533 self.fs.wait_for_daemons()
534 self.mount_a.mount_wait()
535 files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n")
536 self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry]))))
537
538 # Stop the filesystem
539 self.mount_a.umount_wait()
540 self.fs.fail()
541
542 # Run data-scan, observe that it inserts our dentry back into the correct fragment
543 # by checking the omap now has the dentry's key again
544 self.fs.data_scan(["scan_extents"])
545 self.fs.data_scan(["scan_inodes"])
546 self.fs.data_scan(["scan_links"])
547 self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id))
548
549 # Start the filesystem and check that the dentry we deleted is now once again visible
550 # and points to the correct file data.
551 self.fs.set_joinable()
552 self.fs.wait_for_daemons()
553 self.mount_a.mount_wait()
554 self.mount_a.run_shell(["ls", "-l", "subdir/"]) # debugging
555 # Use sudo because cephfs-data-scan will reinsert the dentry with root ownership, it can't know the real owner.
556 out = self.mount_a.run_shell_payload(f"sudo cat subdir/{victim_dentry}", omit_sudo=False).stdout.getvalue().strip()
557 self.assertEqual(out, victim_dentry)
558
559 # Finally, close the loop by checking our injected dentry survives a merge
560 mds_id = self.fs.get_active_names()[0]
561 self.mount_a.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work
562 self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id)
563 self.fs.mds_asok(["flush", "journal"], mds_id)
564 frag_obj_id = "{0:x}.00000000".format(dir_ino)
565 keys = self._dirfrag_keys(frag_obj_id)
566 self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
567
568 # run scrub to update and make sure rstat.rbytes info in subdir inode and dirfrag
569 # are matched
570 out_json = self.fs.run_scrub(["start", "/subdir", "repair,recursive"])
571 self.assertNotEqual(out_json, None)
572 self.assertEqual(out_json["return_code"], 0)
573 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
574
575 # Remove the whole 'sudbdir' directory
576 self.mount_a.run_shell(["rm", "-rf", "subdir/"])
577
578 @for_teuthology
579 def test_parallel_execution(self):
580 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
581
582 def test_pg_files(self):
583 """
584 That the pg files command tells us which files are associated with
585 a particular PG
586 """
587 file_count = 20
588 self.mount_a.run_shell(["mkdir", "mydir"])
589 self.mount_a.create_n_files("mydir/myfile", file_count)
590
591 # Some files elsewhere in the system that we will ignore
592 # to check that the tool is filtering properly
593 self.mount_a.run_shell(["mkdir", "otherdir"])
594 self.mount_a.create_n_files("otherdir/otherfile", file_count)
595
596 pgs_to_files = defaultdict(list)
597 # Rough (slow) reimplementation of the logic
598 for i in range(0, file_count):
599 file_path = "mydir/myfile_{0}".format(i)
600 ino = self.mount_a.path_to_ino(file_path)
601 obj = "{0:x}.{1:08x}".format(ino, 0)
602 pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd(
603 "osd", "map", self.fs.get_data_pool_name(), obj,
604 "--format=json-pretty"
605 ))['pgid']
606 pgs_to_files[pgid].append(file_path)
607 log.info("{0}: {1}".format(file_path, pgid))
608
609 pg_count = self.fs.get_pool_pg_num(self.fs.get_data_pool_name())
610 for pg_n in range(0, pg_count):
611 pg_str = "{0}.{1:x}".format(self.fs.get_data_pool_id(), pg_n)
612 out = self.fs.data_scan(["pg_files", "mydir", pg_str])
613 lines = [l for l in out.split("\n") if l]
614 log.info("{0}: {1}".format(pg_str, lines))
615 self.assertSetEqual(set(lines), set(pgs_to_files[pg_str]))
616
617 def test_rebuild_linkage(self):
618 """
619 The scan_links command fixes linkage errors
620 """
621 self.mount_a.run_shell(["mkdir", "testdir1"])
622 self.mount_a.run_shell(["mkdir", "testdir2"])
623 dir1_ino = self.mount_a.path_to_ino("testdir1")
624 dir2_ino = self.mount_a.path_to_ino("testdir2")
625 dirfrag1_oid = "{0:x}.00000000".format(dir1_ino)
626 dirfrag2_oid = "{0:x}.00000000".format(dir2_ino)
627
628 self.mount_a.run_shell(["touch", "testdir1/file1"])
629 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
630 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
631
632 mds_id = self.fs.get_active_names()[0]
633 self.fs.mds_asok(["flush", "journal"], mds_id)
634
635 dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid)
636
637 # introduce duplicated primary link
638 file1_key = "file1_head"
639 self.assertIn(file1_key, dirfrag1_keys)
640 file1_omap_data = self.fs.radosmo(["getomapval", dirfrag1_oid, file1_key, '-'])
641 self.fs.radosm(["setomapval", dirfrag2_oid, file1_key], stdin=BytesIO(file1_omap_data))
642 self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
643
644 # remove a remote link, make inode link count incorrect
645 link1_key = 'link1_head'
646 self.assertIn(link1_key, dirfrag1_keys)
647 self.fs.radosm(["rmomapkey", dirfrag1_oid, link1_key])
648
649 # increase good primary link's version
650 self.mount_a.run_shell(["touch", "testdir1/file1"])
651 self.mount_a.umount_wait()
652
653 self.fs.mds_asok(["flush", "journal"], mds_id)
654 self.fs.fail()
655
656 # repair linkage errors
657 self.fs.data_scan(["scan_links"])
658
659 # primary link in testdir2 was deleted?
660 self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
661
662 self.fs.set_joinable()
663 self.fs.wait_for_daemons()
664
665 self.mount_a.mount_wait()
666
667 # link count was adjusted?
668 file1_nlink = self.mount_a.path_to_nlink("testdir1/file1")
669 self.assertEqual(file1_nlink, 2)
670
671 out_json = self.fs.run_scrub(["start", "/testdir1", "repair,recursive"])
672 self.assertNotEqual(out_json, None)
673 self.assertEqual(out_json["return_code"], 0)
674 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
675
676 def test_rebuild_inotable(self):
677 """
678 The scan_links command repair inotables
679 """
680 self.fs.set_max_mds(2)
681 self.fs.wait_for_daemons()
682
683 active_mds_names = self.fs.get_active_names()
684 mds0_id = active_mds_names[0]
685 mds1_id = active_mds_names[1]
686
687 self.mount_a.run_shell(["mkdir", "dir1"])
688 dir_ino = self.mount_a.path_to_ino("dir1")
689 self.mount_a.setfattr("dir1", "ceph.dir.pin", "1")
690 # wait for subtree migration
691
692 file_ino = 0;
693 while True:
694 time.sleep(1)
695 # allocate an inode from mds.1
696 self.mount_a.run_shell(["touch", "dir1/file1"])
697 file_ino = self.mount_a.path_to_ino("dir1/file1")
698 if file_ino >= (2 << 40):
699 break
700 self.mount_a.run_shell(["rm", "-f", "dir1/file1"])
701
702 self.mount_a.umount_wait()
703
704 self.fs.mds_asok(["flush", "journal"], mds0_id)
705 self.fs.mds_asok(["flush", "journal"], mds1_id)
706 self.fs.fail()
707
708 self.fs.radosm(["rm", "mds0_inotable"])
709 self.fs.radosm(["rm", "mds1_inotable"])
710
711 self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
712
713 mds0_inotable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "inode"]))
714 self.assertGreaterEqual(
715 mds0_inotable['0']['data']['inotable']['free'][0]['start'], dir_ino)
716
717 mds1_inotable = json.loads(self.fs.table_tool([self.fs.name + ":1", "show", "inode"]))
718 self.assertGreaterEqual(
719 mds1_inotable['1']['data']['inotable']['free'][0]['start'], file_ino)
720
721 self.fs.set_joinable()
722 self.fs.wait_for_daemons()
723
724 out_json = self.fs.run_scrub(["start", "/dir1", "repair,recursive"])
725 self.assertNotEqual(out_json, None)
726 self.assertEqual(out_json["return_code"], 0)
727 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
728
729 def test_rebuild_snaptable(self):
730 """
731 The scan_links command repair snaptable
732 """
733 self.fs.set_allow_new_snaps(True)
734
735 self.mount_a.run_shell(["mkdir", "dir1"])
736 self.mount_a.run_shell(["mkdir", "dir1/.snap/s1"])
737 self.mount_a.run_shell(["mkdir", "dir1/.snap/s2"])
738 self.mount_a.run_shell(["rmdir", "dir1/.snap/s2"])
739
740 self.mount_a.umount_wait()
741
742 mds0_id = self.fs.get_active_names()[0]
743 self.fs.mds_asok(["flush", "journal"], mds0_id)
744
745 # wait for mds to update removed snaps
746 time.sleep(10)
747
748 old_snaptable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "snap"]))
749 # stamps may have minor difference
750 for item in old_snaptable['snapserver']['snaps']:
751 del item['stamp']
752
753 self.fs.radosm(["rm", "mds_snaptable"])
754 self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
755
756 new_snaptable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "snap"]))
757 for item in new_snaptable['snapserver']['snaps']:
758 del item['stamp']
759 self.assertGreaterEqual(
760 new_snaptable['snapserver']['last_snap'], old_snaptable['snapserver']['last_snap'])
761 self.assertEqual(
762 new_snaptable['snapserver']['snaps'], old_snaptable['snapserver']['snaps'])
763
764 out_json = self.fs.run_scrub(["start", "/dir1", "repair,recursive"])
765 self.assertNotEqual(out_json, None)
766 self.assertEqual(out_json["return_code"], 0)
767 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
768
769 def _prepare_extra_data_pool(self, set_root_layout=True):
770 extra_data_pool_name = self.fs.get_data_pool_name() + '_extra'
771 self.fs.add_data_pool(extra_data_pool_name)
772 if set_root_layout:
773 self.mount_a.setfattr(".", "ceph.dir.layout.pool",
774 extra_data_pool_name)
775 return extra_data_pool_name
776
777 def test_extra_data_pool_rebuild_simple(self):
778 self._prepare_extra_data_pool()
779 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
780
781 def test_extra_data_pool_rebuild_few_files(self):
782 self._prepare_extra_data_pool()
783 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 5), workers=1)
784
785 @for_teuthology
786 def test_extra_data_pool_rebuild_many_files_many_workers(self):
787 self._prepare_extra_data_pool()
788 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
789
790 def test_extra_data_pool_stashed_layout(self):
791 pool_name = self._prepare_extra_data_pool(False)
792 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a, pool_name))