]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_data_scan.py
9a93bd622126d46fb326f236deeecb8b49850aa9
[ceph.git] / ceph / qa / tasks / cephfs / test_data_scan.py
1
2 """
3 Test our tools for recovering metadata from the data pool
4 """
5 import json
6
7 import logging
8 import os
9 import time
10 import traceback
11 import stat
12
13 from io import BytesIO, StringIO
14 from collections import namedtuple, defaultdict
15 from textwrap import dedent
16
17 from teuthology.exceptions import CommandFailedError
18 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
19
20 log = logging.getLogger(__name__)
21
22
23 ValidationError = namedtuple("ValidationError", ["exception", "backtrace"])
24
25
26 class Workload(object):
27 def __init__(self, filesystem, mount):
28 self._mount = mount
29 self._filesystem = filesystem
30 self._initial_state = None
31
32 # Accumulate backtraces for every failed validation, and return them. Backtraces
33 # are rather verbose, but we only see them when something breaks, and they
34 # let us see which check failed without having to decorate each check with
35 # a string
36 self._errors = []
37
38 def assert_equal(self, a, b):
39 try:
40 if a != b:
41 raise AssertionError("{0} != {1}".format(a, b))
42 except AssertionError as e:
43 self._errors.append(
44 ValidationError(e, traceback.format_exc(3))
45 )
46
47 def assert_not_equal(self, a, b):
48 try:
49 if a == b:
50 raise AssertionError("{0} == {1}".format(a, b))
51 except AssertionError as e:
52 self._errors.append(
53 ValidationError(e, traceback.format_exc(3))
54 )
55
56 def assert_true(self, a):
57 try:
58 if not a:
59 raise AssertionError("{0} is not true".format(a))
60 except AssertionError as e:
61 self._errors.append(
62 ValidationError(e, traceback.format_exc(3))
63 )
64
65 def write(self):
66 """
67 Write the workload files to the mount
68 """
69 raise NotImplementedError()
70
71 def validate(self):
72 """
73 Read from the mount and validate that the workload files are present (i.e. have
74 survived or been reconstructed from the test scenario)
75 """
76 raise NotImplementedError()
77
78 def damage(self):
79 """
80 Damage the filesystem pools in ways that will be interesting to recover from. By
81 default just wipe everything in the metadata pool
82 """
83 # Delete every object in the metadata pool
84 pool = self._filesystem.get_metadata_pool_name()
85 self._filesystem.rados(["purge", pool, '--yes-i-really-really-mean-it'])
86
87 def flush(self):
88 """
89 Called after client unmount, after write: flush whatever you want
90 """
91 self._filesystem.mds_asok(["flush", "journal"])
92
93 def scrub(self):
94 """
95 Called as a final step post recovery before verification. Right now, this
96 doesn't bother if errors are found in scrub - just that the MDS doesn't
97 crash and burn during scrub.
98 """
99 out_json = self._filesystem.run_scrub(["start", "/", "repair,recursive"])
100 self.assert_not_equal(out_json, None)
101 self.assert_equal(out_json["return_code"], 0)
102 self.assert_equal(self._filesystem.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
103
104 class SimpleWorkload(Workload):
105 """
106 Single file, single directory, check that it gets recovered and so does its size
107 """
108 def write(self):
109 self._mount.run_shell(["mkdir", "subdir"])
110 self._mount.write_n_mb("subdir/sixmegs", 6)
111 self._initial_state = self._mount.stat("subdir/sixmegs")
112
113 def validate(self):
114 self._mount.run_shell(["sudo", "ls", "subdir"], omit_sudo=False)
115 st = self._mount.stat("subdir/sixmegs", sudo=True)
116 self.assert_equal(st['st_size'], self._initial_state['st_size'])
117 return self._errors
118
119
120 class SymlinkWorkload(Workload):
121 """
122 Symlink file, check that it gets recovered as symlink
123 """
124 def write(self):
125 self._mount.run_shell(["mkdir", "symdir"])
126 self._mount.write_n_mb("symdir/onemegs", 1)
127 self._mount.run_shell(["ln", "-s", "onemegs", "symdir/symlink_onemegs"])
128 self._mount.run_shell(["ln", "-s", "symdir/onemegs", "symlink1_onemegs"])
129
130 def validate(self):
131 self._mount.run_shell(["sudo", "ls", "symdir"], omit_sudo=False)
132 st = self._mount.lstat("symdir/symlink_onemegs")
133 self.assert_true(stat.S_ISLNK(st['st_mode']))
134 target = self._mount.readlink("symdir/symlink_onemegs")
135 self.assert_equal(target, "onemegs")
136
137 st = self._mount.lstat("symlink1_onemegs")
138 self.assert_true(stat.S_ISLNK(st['st_mode']))
139 target = self._mount.readlink("symlink1_onemegs")
140 self.assert_equal(target, "symdir/onemegs")
141 return self._errors
142
143
144 class MovedFile(Workload):
145 def write(self):
146 # Create a file whose backtrace disagrees with his eventual position
147 # in the metadata. We will see that he gets reconstructed in his
148 # original position according to his backtrace.
149 self._mount.run_shell(["mkdir", "subdir_alpha"])
150 self._mount.run_shell(["mkdir", "subdir_bravo"])
151 self._mount.write_n_mb("subdir_alpha/sixmegs", 6)
152 self._filesystem.mds_asok(["flush", "journal"])
153 self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
154 self._initial_state = self._mount.stat("subdir_bravo/sixmegs")
155
156 def flush(self):
157 pass
158
159 def validate(self):
160 self.assert_equal(self._mount.ls(sudo=True), ["subdir_alpha"])
161 st = self._mount.stat("subdir_alpha/sixmegs", sudo=True)
162 self.assert_equal(st['st_size'], self._initial_state['st_size'])
163 return self._errors
164
165
166 class BacktracelessFile(Workload):
167 def write(self):
168 self._mount.run_shell(["mkdir", "subdir"])
169 self._mount.write_n_mb("subdir/sixmegs", 6)
170 self._initial_state = self._mount.stat("subdir/sixmegs")
171
172 def flush(self):
173 # Never flush metadata, so backtrace won't be written
174 pass
175
176 def validate(self):
177 ino_name = "%x" % self._initial_state["st_ino"]
178
179 # The inode should be linked into lost+found because we had no path for it
180 self.assert_equal(self._mount.ls(sudo=True), ["lost+found"])
181 self.assert_equal(self._mount.ls("lost+found", sudo=True), [ino_name])
182 st = self._mount.stat(f"lost+found/{ino_name}", sudo=True)
183
184 # We might not have got the name or path, but we should still get the size
185 self.assert_equal(st['st_size'], self._initial_state['st_size'])
186
187 # remove the entry from lost+found directory
188 self._mount.run_shell(["sudo", "rm", "-f", f'lost+found/{ino_name}'], omit_sudo=False)
189 self.assert_equal(self._mount.ls("lost+found", sudo=True), [])
190
191 return self._errors
192
193
194 class StripedStashedLayout(Workload):
195 def __init__(self, fs, m, pool=None):
196 super(StripedStashedLayout, self).__init__(fs, m)
197
198 # Nice small stripes so we can quickly do our writes+validates
199 self.sc = 4
200 self.ss = 65536
201 self.os = 262144
202 self.pool = pool and pool or self._filesystem.get_data_pool_name()
203
204 self.interesting_sizes = [
205 # Exactly stripe_count objects will exist
206 self.os * self.sc,
207 # Fewer than stripe_count objects will exist
208 self.os * self.sc // 2,
209 self.os * (self.sc - 1) + self.os // 2,
210 self.os * (self.sc - 1) + self.os // 2 - 1,
211 self.os * (self.sc + 1) + self.os // 2,
212 self.os * (self.sc + 1) + self.os // 2 + 1,
213 # More than stripe_count objects will exist
214 self.os * self.sc + self.os * self.sc // 2
215 ]
216
217 def write(self):
218 # Create a dir with a striped layout set on it
219 self._mount.run_shell(["mkdir", "stripey"])
220
221 self._mount.setfattr("./stripey", "ceph.dir.layout",
222 "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
223 ss=self.ss, os=self.os, sc=self.sc, pool=self.pool
224 ))
225
226 # Write files, then flush metadata so that its layout gets written into an xattr
227 for i, n_bytes in enumerate(self.interesting_sizes):
228 self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
229 # This is really just validating the validator
230 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
231 self._filesystem.mds_asok(["flush", "journal"])
232
233 # Write another file in the same way, but this time don't flush the metadata,
234 # so that it won't have the layout xattr
235 self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512)
236 self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
237
238 self._initial_state = {
239 "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file")
240 }
241
242 def flush(self):
243 # Pass because we already selectively flushed during write
244 pass
245
246 def validate(self):
247 # The first files should have been recovered into its original location
248 # with the correct layout: read back correct data
249 for i, n_bytes in enumerate(self.interesting_sizes):
250 try:
251 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
252 except CommandFailedError as e:
253 self._errors.append(
254 ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3))
255 )
256
257 # The unflushed file should have been recovered into lost+found without
258 # the correct layout: read back junk
259 ino_name = "%x" % self._initial_state["unflushed_ino"]
260 self.assert_equal(self._mount.ls("lost+found", sudo=True), [ino_name])
261 try:
262 self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512)
263 except CommandFailedError:
264 pass
265 else:
266 self._errors.append(
267 ValidationError("Unexpectedly valid data in unflushed striped file", "")
268 )
269
270 return self._errors
271
272
273 class ManyFilesWorkload(Workload):
274 def __init__(self, filesystem, mount, file_count):
275 super(ManyFilesWorkload, self).__init__(filesystem, mount)
276 self.file_count = file_count
277
278 def write(self):
279 self._mount.run_shell(["mkdir", "subdir"])
280 for n in range(0, self.file_count):
281 self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
282
283 def validate(self):
284 for n in range(0, self.file_count):
285 try:
286 self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
287 except CommandFailedError as e:
288 self._errors.append(
289 ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
290 )
291
292 return self._errors
293
294
295 class MovedDir(Workload):
296 def write(self):
297 # Create a nested dir that we will then move. Two files with two different
298 # backtraces referring to the moved dir, claiming two different locations for
299 # it. We will see that only one backtrace wins and the dir ends up with
300 # single linkage.
301 self._mount.run_shell(["mkdir", "-p", "grandmother/parent"])
302 self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1)
303 self._filesystem.mds_asok(["flush", "journal"])
304 self._mount.run_shell(["mkdir", "grandfather"])
305 self._mount.run_shell(["mv", "grandmother/parent", "grandfather"])
306 self._mount.write_n_mb("grandfather/parent/new_pos_file", 2)
307 self._filesystem.mds_asok(["flush", "journal"])
308
309 self._initial_state = (
310 self._mount.stat("grandfather/parent/orig_pos_file"),
311 self._mount.stat("grandfather/parent/new_pos_file")
312 )
313
314 def validate(self):
315 root_files = self._mount.ls()
316 self.assert_equal(len(root_files), 1)
317 self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True)
318 winner = root_files[0]
319 st_opf = self._mount.stat(f"{winner}/parent/orig_pos_file", sudo=True)
320 st_npf = self._mount.stat(f"{winner}/parent/new_pos_file", sudo=True)
321
322 self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size'])
323 self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size'])
324
325
326 class MissingZerothObject(Workload):
327 def write(self):
328 self._mount.run_shell(["mkdir", "subdir"])
329 self._mount.write_n_mb("subdir/sixmegs", 6)
330 self._initial_state = self._mount.stat("subdir/sixmegs")
331
332 def damage(self):
333 super(MissingZerothObject, self).damage()
334 zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino'])
335 self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name())
336
337 def validate(self):
338 ino = self._initial_state['st_ino']
339 st = self._mount.stat(f"lost+found/{ino:x}", sudo=True)
340 self.assert_equal(st['st_size'], self._initial_state['st_size'])
341
342
343 class NonDefaultLayout(Workload):
344 """
345 Check that the reconstruction copes with files that have a different
346 object size in their layout
347 """
348 def write(self):
349 self._mount.run_shell(["touch", "datafile"])
350 self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
351 self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
352 self._initial_state = self._mount.stat("datafile")
353
354 def validate(self):
355 # Check we got the layout reconstructed properly
356 object_size = int(self._mount.getfattr("./datafile", "ceph.file.layout.object_size", sudo=True))
357 self.assert_equal(object_size, 8388608)
358
359 # Check we got the file size reconstructed properly
360 st = self._mount.stat("datafile", sudo=True)
361 self.assert_equal(st['st_size'], self._initial_state['st_size'])
362
363
364 class TestDataScan(CephFSTestCase):
365 MDSS_REQUIRED = 2
366
367 def is_marked_damaged(self, rank):
368 mds_map = self.fs.get_mds_map()
369 return rank in mds_map['damaged']
370
371 def _rebuild_metadata(self, workload, workers=1):
372 """
373 That when all objects in metadata pool are removed, we can rebuild a metadata pool
374 based on the contents of a data pool, and a client can see and read our files.
375 """
376
377 # First, inject some files
378
379 workload.write()
380
381 # Unmount the client and flush the journal: the tool should also cope with
382 # situations where there is dirty metadata, but we'll test that separately
383 self.mount_a.umount_wait()
384 workload.flush()
385
386 # Stop the MDS
387 self.fs.fail()
388
389 # After recovery, we need the MDS to not be strict about stats (in production these options
390 # are off by default, but in QA we need to explicitly disable them)
391 self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
392 self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
393
394 # Apply any data damage the workload wants
395 workload.damage()
396
397 # Reset the MDS map in case multiple ranks were in play: recovery procedure
398 # only understands how to rebuild metadata under rank 0
399 self.fs.reset()
400
401 self.fs.set_joinable() # redundant with reset
402
403 def get_state(mds_id):
404 info = self.mds_cluster.get_mds_info(mds_id)
405 return info['state'] if info is not None else None
406
407 self.wait_until_true(lambda: self.is_marked_damaged(0), 60)
408 for mds_id in self.fs.mds_ids:
409 self.wait_until_equal(
410 lambda: get_state(mds_id),
411 "up:standby",
412 timeout=60)
413
414 self.fs.table_tool([self.fs.name + ":0", "reset", "session"])
415 self.fs.table_tool([self.fs.name + ":0", "reset", "snap"])
416 self.fs.table_tool([self.fs.name + ":0", "reset", "inode"])
417
418 # Run the recovery procedure
419 if False:
420 with self.assertRaises(CommandFailedError):
421 # Normal reset should fail when no objects are present, we'll use --force instead
422 self.fs.journal_tool(["journal", "reset"], 0)
423
424 self.fs.journal_tool(["journal", "reset", "--force"], 0)
425 self.fs.data_scan(["init"])
426 self.fs.data_scan(["scan_extents"], worker_count=workers)
427 self.fs.data_scan(["scan_inodes"], worker_count=workers)
428 self.fs.data_scan(["scan_links"])
429
430 # Mark the MDS repaired
431 self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
432
433 # Start the MDS
434 self.fs.mds_restart()
435 self.fs.wait_for_daemons()
436 log.info(str(self.mds_cluster.status()))
437
438 # Mount a client
439 self.mount_a.mount_wait()
440
441 # run scrub as it is recommended post recovery for most
442 # (if not all) recovery mechanisms.
443 workload.scrub()
444
445 # See that the files are present and correct
446 errors = workload.validate()
447 if errors:
448 log.error("Validation errors found: {0}".format(len(errors)))
449 for e in errors:
450 log.error(e.exception)
451 log.error(e.backtrace)
452 raise AssertionError("Validation failed, first error: {0}\n{1}".format(
453 errors[0].exception, errors[0].backtrace
454 ))
455
456 def test_rebuild_simple(self):
457 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
458
459 def test_rebuild_symlink(self):
460 self._rebuild_metadata(SymlinkWorkload(self.fs, self.mount_a))
461
462 def test_rebuild_moved_file(self):
463 self._rebuild_metadata(MovedFile(self.fs, self.mount_a))
464
465 def test_rebuild_backtraceless(self):
466 self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a))
467
468 def test_rebuild_moved_dir(self):
469 self._rebuild_metadata(MovedDir(self.fs, self.mount_a))
470
471 def test_rebuild_missing_zeroth(self):
472 self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a))
473
474 def test_rebuild_nondefault_layout(self):
475 self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a))
476
477 def test_stashed_layout(self):
478 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
479
480 def _dirfrag_keys(self, object_id):
481 keys_str = self.fs.radosmo(["listomapkeys", object_id], stdout=StringIO())
482 if keys_str:
483 return keys_str.strip().split("\n")
484 else:
485 return []
486
487 def test_fragmented_injection(self):
488 """
489 That when injecting a dentry into a fragmented directory, we put it in the right fragment.
490 """
491
492 file_count = 100
493 file_names = ["%s" % n for n in range(0, file_count)]
494
495 # Make sure and disable dirfrag auto merging and splitting
496 self.fs.set_ceph_conf('mds', 'mds bal merge size', 0)
497 self.fs.set_ceph_conf('mds', 'mds bal split size', 100 * file_count)
498
499 # Create a directory of `file_count` files, each named after its
500 # decimal number and containing the string of its decimal number
501 self.mount_a.run_python(dedent("""
502 import os
503 path = os.path.join("{path}", "subdir")
504 os.mkdir(path)
505 for n in range(0, {file_count}):
506 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
507 """.format(
508 path=self.mount_a.mountpoint,
509 file_count=file_count
510 )))
511
512 dir_ino = self.mount_a.path_to_ino("subdir")
513
514 # Only one MDS should be active!
515 self.assertEqual(len(self.fs.get_active_names()), 1)
516
517 # Ensure that one directory is fragmented
518 mds_id = self.fs.get_active_names()[0]
519 self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id)
520
521 # Flush journal and stop MDS
522 self.mount_a.umount_wait()
523 self.fs.mds_asok(["flush", "journal"], mds_id)
524 self.fs.fail()
525
526 # Pick a dentry and wipe out its key
527 # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
528 frag_obj_id = "{0:x}.01000000".format(dir_ino)
529 keys = self._dirfrag_keys(frag_obj_id)
530 victim_key = keys[7] # arbitrary choice
531 log.info("victim_key={0}".format(victim_key))
532 victim_dentry = victim_key.split("_head")[0]
533 self.fs.radosm(["rmomapkey", frag_obj_id, victim_key])
534
535 # Start filesystem back up, observe that the file appears to be gone in an `ls`
536 self.fs.set_joinable()
537 self.fs.wait_for_daemons()
538 self.mount_a.mount_wait()
539 files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n")
540 self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry]))))
541
542 # Stop the filesystem
543 self.mount_a.umount_wait()
544 self.fs.fail()
545
546 # Run data-scan, observe that it inserts our dentry back into the correct fragment
547 # by checking the omap now has the dentry's key again
548 self.fs.data_scan(["scan_extents"])
549 self.fs.data_scan(["scan_inodes"])
550 self.fs.data_scan(["scan_links"])
551 self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id))
552
553 # Start the filesystem and check that the dentry we deleted is now once again visible
554 # and points to the correct file data.
555 self.fs.set_joinable()
556 self.fs.wait_for_daemons()
557 self.mount_a.mount_wait()
558 self.mount_a.run_shell(["ls", "-l", "subdir/"]) # debugging
559 # Use sudo because cephfs-data-scan will reinsert the dentry with root ownership, it can't know the real owner.
560 out = self.mount_a.run_shell_payload(f"sudo cat subdir/{victim_dentry}", omit_sudo=False).stdout.getvalue().strip()
561 self.assertEqual(out, victim_dentry)
562
563 # Finally, close the loop by checking our injected dentry survives a merge
564 mds_id = self.fs.get_active_names()[0]
565 self.mount_a.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work
566 self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id)
567 self.fs.mds_asok(["flush", "journal"], mds_id)
568 frag_obj_id = "{0:x}.00000000".format(dir_ino)
569 keys = self._dirfrag_keys(frag_obj_id)
570 self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
571
572 # run scrub to update and make sure rstat.rbytes info in subdir inode and dirfrag
573 # are matched
574 out_json = self.fs.run_scrub(["start", "/subdir", "repair,recursive"])
575 self.assertNotEqual(out_json, None)
576 self.assertEqual(out_json["return_code"], 0)
577 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
578
579 # Remove the whole 'sudbdir' directory
580 self.mount_a.run_shell(["rm", "-rf", "subdir/"])
581
582 @for_teuthology
583 def test_parallel_execution(self):
584 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
585
586 def test_pg_files(self):
587 """
588 That the pg files command tells us which files are associated with
589 a particular PG
590 """
591 file_count = 20
592 self.mount_a.run_shell(["mkdir", "mydir"])
593 self.mount_a.create_n_files("mydir/myfile", file_count)
594
595 # Some files elsewhere in the system that we will ignore
596 # to check that the tool is filtering properly
597 self.mount_a.run_shell(["mkdir", "otherdir"])
598 self.mount_a.create_n_files("otherdir/otherfile", file_count)
599
600 pgs_to_files = defaultdict(list)
601 # Rough (slow) reimplementation of the logic
602 for i in range(0, file_count):
603 file_path = "mydir/myfile_{0}".format(i)
604 ino = self.mount_a.path_to_ino(file_path)
605 obj = "{0:x}.{1:08x}".format(ino, 0)
606 pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd(
607 "osd", "map", self.fs.get_data_pool_name(), obj,
608 "--format=json-pretty"
609 ))['pgid']
610 pgs_to_files[pgid].append(file_path)
611 log.info("{0}: {1}".format(file_path, pgid))
612
613 pg_count = self.fs.get_pool_pg_num(self.fs.get_data_pool_name())
614 for pg_n in range(0, pg_count):
615 pg_str = "{0}.{1:x}".format(self.fs.get_data_pool_id(), pg_n)
616 out = self.fs.data_scan(["pg_files", "mydir", pg_str])
617 lines = [l for l in out.split("\n") if l]
618 log.info("{0}: {1}".format(pg_str, lines))
619 self.assertSetEqual(set(lines), set(pgs_to_files[pg_str]))
620
621 def test_rebuild_linkage(self):
622 """
623 The scan_links command fixes linkage errors
624 """
625 self.mount_a.run_shell(["mkdir", "testdir1"])
626 self.mount_a.run_shell(["mkdir", "testdir2"])
627 dir1_ino = self.mount_a.path_to_ino("testdir1")
628 dir2_ino = self.mount_a.path_to_ino("testdir2")
629 dirfrag1_oid = "{0:x}.00000000".format(dir1_ino)
630 dirfrag2_oid = "{0:x}.00000000".format(dir2_ino)
631
632 self.mount_a.run_shell(["touch", "testdir1/file1"])
633 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
634 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
635
636 mds_id = self.fs.get_active_names()[0]
637 self.fs.mds_asok(["flush", "journal"], mds_id)
638
639 dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid)
640
641 # introduce duplicated primary link
642 file1_key = "file1_head"
643 self.assertIn(file1_key, dirfrag1_keys)
644 file1_omap_data = self.fs.radosmo(["getomapval", dirfrag1_oid, file1_key, '-'])
645 self.fs.radosm(["setomapval", dirfrag2_oid, file1_key], stdin=BytesIO(file1_omap_data))
646 self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
647
648 # remove a remote link, make inode link count incorrect
649 link1_key = 'link1_head'
650 self.assertIn(link1_key, dirfrag1_keys)
651 self.fs.radosm(["rmomapkey", dirfrag1_oid, link1_key])
652
653 # increase good primary link's version
654 self.mount_a.run_shell(["touch", "testdir1/file1"])
655 self.mount_a.umount_wait()
656
657 self.fs.mds_asok(["flush", "journal"], mds_id)
658 self.fs.fail()
659
660 # repair linkage errors
661 self.fs.data_scan(["scan_links"])
662
663 # primary link in testdir2 was deleted?
664 self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
665
666 self.fs.set_joinable()
667 self.fs.wait_for_daemons()
668
669 self.mount_a.mount_wait()
670
671 # link count was adjusted?
672 file1_nlink = self.mount_a.path_to_nlink("testdir1/file1")
673 self.assertEqual(file1_nlink, 2)
674
675 out_json = self.fs.run_scrub(["start", "/testdir1", "repair,recursive"])
676 self.assertNotEqual(out_json, None)
677 self.assertEqual(out_json["return_code"], 0)
678 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
679
680 def test_rebuild_inotable(self):
681 """
682 The scan_links command repair inotables
683 """
684 self.fs.set_max_mds(2)
685 self.fs.wait_for_daemons()
686
687 active_mds_names = self.fs.get_active_names()
688 mds0_id = active_mds_names[0]
689 mds1_id = active_mds_names[1]
690
691 self.mount_a.run_shell(["mkdir", "dir1"])
692 dir_ino = self.mount_a.path_to_ino("dir1")
693 self.mount_a.setfattr("dir1", "ceph.dir.pin", "1")
694 # wait for subtree migration
695
696 file_ino = 0;
697 while True:
698 time.sleep(1)
699 # allocate an inode from mds.1
700 self.mount_a.run_shell(["touch", "dir1/file1"])
701 file_ino = self.mount_a.path_to_ino("dir1/file1")
702 if file_ino >= (2 << 40):
703 break
704 self.mount_a.run_shell(["rm", "-f", "dir1/file1"])
705
706 self.mount_a.umount_wait()
707
708 self.fs.mds_asok(["flush", "journal"], mds0_id)
709 self.fs.mds_asok(["flush", "journal"], mds1_id)
710 self.fs.fail()
711
712 self.fs.radosm(["rm", "mds0_inotable"])
713 self.fs.radosm(["rm", "mds1_inotable"])
714
715 self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
716
717 mds0_inotable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "inode"]))
718 self.assertGreaterEqual(
719 mds0_inotable['0']['data']['inotable']['free'][0]['start'], dir_ino)
720
721 mds1_inotable = json.loads(self.fs.table_tool([self.fs.name + ":1", "show", "inode"]))
722 self.assertGreaterEqual(
723 mds1_inotable['1']['data']['inotable']['free'][0]['start'], file_ino)
724
725 self.fs.set_joinable()
726 self.fs.wait_for_daemons()
727
728 out_json = self.fs.run_scrub(["start", "/dir1", "repair,recursive"])
729 self.assertNotEqual(out_json, None)
730 self.assertEqual(out_json["return_code"], 0)
731 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
732
733 def test_rebuild_snaptable(self):
734 """
735 The scan_links command repair snaptable
736 """
737 self.fs.set_allow_new_snaps(True)
738
739 self.mount_a.run_shell(["mkdir", "dir1"])
740 self.mount_a.run_shell(["mkdir", "dir1/.snap/s1"])
741 self.mount_a.run_shell(["mkdir", "dir1/.snap/s2"])
742 self.mount_a.run_shell(["rmdir", "dir1/.snap/s2"])
743
744 self.mount_a.umount_wait()
745
746 mds0_id = self.fs.get_active_names()[0]
747 self.fs.mds_asok(["flush", "journal"], mds0_id)
748
749 # wait for mds to update removed snaps
750 time.sleep(10)
751
752 old_snaptable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "snap"]))
753 # stamps may have minor difference
754 for item in old_snaptable['snapserver']['snaps']:
755 del item['stamp']
756
757 self.fs.radosm(["rm", "mds_snaptable"])
758 self.fs.data_scan(["scan_links", "--filesystem", self.fs.name])
759
760 new_snaptable = json.loads(self.fs.table_tool([self.fs.name + ":0", "show", "snap"]))
761 for item in new_snaptable['snapserver']['snaps']:
762 del item['stamp']
763 self.assertGreaterEqual(
764 new_snaptable['snapserver']['last_snap'], old_snaptable['snapserver']['last_snap'])
765 self.assertEqual(
766 new_snaptable['snapserver']['snaps'], old_snaptable['snapserver']['snaps'])
767
768 out_json = self.fs.run_scrub(["start", "/dir1", "repair,recursive"])
769 self.assertNotEqual(out_json, None)
770 self.assertEqual(out_json["return_code"], 0)
771 self.assertEqual(self.fs.wait_until_scrub_complete(tag=out_json["scrub_tag"]), True)
772
773 def _prepare_extra_data_pool(self, set_root_layout=True):
774 extra_data_pool_name = self.fs.get_data_pool_name() + '_extra'
775 self.fs.add_data_pool(extra_data_pool_name)
776 if set_root_layout:
777 self.mount_a.setfattr(".", "ceph.dir.layout.pool",
778 extra_data_pool_name)
779 return extra_data_pool_name
780
781 def test_extra_data_pool_rebuild_simple(self):
782 self._prepare_extra_data_pool()
783 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
784
785 def test_extra_data_pool_rebuild_few_files(self):
786 self._prepare_extra_data_pool()
787 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 5), workers=1)
788
789 @for_teuthology
790 def test_extra_data_pool_rebuild_many_files_many_workers(self):
791 self._prepare_extra_data_pool()
792 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
793
794 def test_extra_data_pool_stashed_layout(self):
795 pool_name = self._prepare_extra_data_pool(False)
796 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a, pool_name))