]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_data_scan.py
3 Test our tools for recovering metadata from the data pool
9 from textwrap
import dedent
11 from collections
import namedtuple
, defaultdict
13 from teuthology
.orchestra
.run
import CommandFailedError
14 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
16 log
= logging
.getLogger(__name__
)
19 ValidationError
= namedtuple("ValidationError", ["exception", "backtrace"])
22 class Workload(object):
23 def __init__(self
, filesystem
, mount
):
25 self
._filesystem
= filesystem
26 self
._initial
_state
= None
28 # Accumulate backtraces for every failed validation, and return them. Backtraces
29 # are rather verbose, but we only see them when something breaks, and they
30 # let us see which check failed without having to decorate each check with
34 def assert_equal(self
, a
, b
):
37 raise AssertionError("{0} != {1}".format(a
, b
))
38 except AssertionError as e
:
40 ValidationError(e
, traceback
.format_exc(3))
45 Write the workload files to the mount
47 raise NotImplementedError()
51 Read from the mount and validate that the workload files are present (i.e. have
52 survived or been reconstructed from the test scenario)
54 raise NotImplementedError()
58 Damage the filesystem pools in ways that will be interesting to recover from. By
59 default just wipe everything in the metadata pool
61 # Delete every object in the metadata pool
62 objects
= self
._filesystem
.rados(["ls"]).split("\n")
64 self
._filesystem
.rados(["rm", o
])
68 Called after client unmount, after write: flush whatever you want
70 self
._filesystem
.mds_asok(["flush", "journal"])
73 class SimpleWorkload(Workload
):
75 Single file, single directory, check that it gets recovered and so does its size
78 self
._mount
.run_shell(["mkdir", "subdir"])
79 self
._mount
.write_n_mb("subdir/sixmegs", 6)
80 self
._initial
_state
= self
._mount
.stat("subdir/sixmegs")
83 self
._mount
.run_shell(["ls", "subdir"])
84 st
= self
._mount
.stat("subdir/sixmegs")
85 self
.assert_equal(st
['st_size'], self
._initial
_state
['st_size'])
89 class MovedFile(Workload
):
91 # Create a file whose backtrace disagrees with his eventual position
92 # in the metadata. We will see that he gets reconstructed in his
93 # original position according to his backtrace.
94 self
._mount
.run_shell(["mkdir", "subdir_alpha"])
95 self
._mount
.run_shell(["mkdir", "subdir_bravo"])
96 self
._mount
.write_n_mb("subdir_alpha/sixmegs", 6)
97 self
._filesystem
.mds_asok(["flush", "journal"])
98 self
._mount
.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
99 self
._initial
_state
= self
._mount
.stat("subdir_bravo/sixmegs")
105 self
.assert_equal(self
._mount
.ls(), ["subdir_alpha"])
106 st
= self
._mount
.stat("subdir_alpha/sixmegs")
107 self
.assert_equal(st
['st_size'], self
._initial
_state
['st_size'])
111 class BacktracelessFile(Workload
):
113 self
._mount
.run_shell(["mkdir", "subdir"])
114 self
._mount
.write_n_mb("subdir/sixmegs", 6)
115 self
._initial
_state
= self
._mount
.stat("subdir/sixmegs")
118 # Never flush metadata, so backtrace won't be written
122 ino_name
= "%x" % self
._initial
_state
["st_ino"]
124 # The inode should be linked into lost+found because we had no path for it
125 self
.assert_equal(self
._mount
.ls(), ["lost+found"])
126 self
.assert_equal(self
._mount
.ls("lost+found"), [ino_name
])
127 st
= self
._mount
.stat("lost+found/{ino_name}".format(ino_name
=ino_name
))
129 # We might not have got the name or path, but we should still get the size
130 self
.assert_equal(st
['st_size'], self
._initial
_state
['st_size'])
135 class StripedStashedLayout(Workload
):
136 def __init__(self
, fs
, m
):
137 super(StripedStashedLayout
, self
).__init
__(fs
, m
)
139 # Nice small stripes so we can quickly do our writes+validates
144 self
.interesting_sizes
= [
145 # Exactly stripe_count objects will exist
147 # Fewer than stripe_count objects will exist
148 self
.os
* self
.sc
/ 2,
149 self
.os
* (self
.sc
- 1) + self
.os
/ 2,
150 self
.os
* (self
.sc
- 1) + self
.os
/ 2 - 1,
151 self
.os
* (self
.sc
+ 1) + self
.os
/ 2,
152 self
.os
* (self
.sc
+ 1) + self
.os
/ 2 + 1,
153 # More than stripe_count objects will exist
154 self
.os
* self
.sc
+ self
.os
* self
.sc
/ 2
158 # Create a dir with a striped layout set on it
159 self
._mount
.run_shell(["mkdir", "stripey"])
161 self
._mount
.setfattr("./stripey", "ceph.dir.layout",
162 "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
163 ss
=self
.ss
, os
=self
.os
, sc
=self
.sc
,
164 pool
=self
._filesystem
.get_data_pool_name()
167 # Write files, then flush metadata so that its layout gets written into an xattr
168 for i
, n_bytes
in enumerate(self
.interesting_sizes
):
169 self
._mount
.write_test_pattern("stripey/flushed_file_{0}".format(i
), n_bytes
)
170 # This is really just validating the validator
171 self
._mount
.validate_test_pattern("stripey/flushed_file_{0}".format(i
), n_bytes
)
172 self
._filesystem
.mds_asok(["flush", "journal"])
174 # Write another file in the same way, but this time don't flush the metadata,
175 # so that it won't have the layout xattr
176 self
._mount
.write_test_pattern("stripey/unflushed_file", 1024 * 512)
177 self
._mount
.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
179 self
._initial
_state
= {
180 "unflushed_ino": self
._mount
.path_to_ino("stripey/unflushed_file")
184 # Pass because we already selectively flushed during write
188 # The first files should have been recovered into its original location
189 # with the correct layout: read back correct data
190 for i
, n_bytes
in enumerate(self
.interesting_sizes
):
192 self
._mount
.validate_test_pattern("stripey/flushed_file_{0}".format(i
), n_bytes
)
193 except CommandFailedError
as e
:
195 ValidationError("File {0} (size {1}): {2}".format(i
, n_bytes
, e
), traceback
.format_exc(3))
198 # The unflushed file should have been recovered into lost+found without
199 # the correct layout: read back junk
200 ino_name
= "%x" % self
._initial
_state
["unflushed_ino"]
201 self
.assert_equal(self
._mount
.ls("lost+found"), [ino_name
])
203 self
._mount
.validate_test_pattern(os
.path
.join("lost+found", ino_name
), 1024 * 512)
204 except CommandFailedError
:
208 ValidationError("Unexpectedly valid data in unflushed striped file", "")
214 class ManyFilesWorkload(Workload
):
215 def __init__(self
, filesystem
, mount
, file_count
):
216 super(ManyFilesWorkload
, self
).__init
__(filesystem
, mount
)
217 self
.file_count
= file_count
220 self
._mount
.run_shell(["mkdir", "subdir"])
221 for n
in range(0, self
.file_count
):
222 self
._mount
.write_test_pattern("subdir/{0}".format(n
), 6 * 1024 * 1024)
225 for n
in range(0, self
.file_count
):
227 self
._mount
.validate_test_pattern("subdir/{0}".format(n
), 6 * 1024 * 1024)
228 except CommandFailedError
as e
:
230 ValidationError("File {0}: {1}".format(n
, e
), traceback
.format_exc(3))
236 class MovedDir(Workload
):
238 # Create a nested dir that we will then move. Two files with two different
239 # backtraces referring to the moved dir, claiming two different locations for
240 # it. We will see that only one backtrace wins and the dir ends up with
242 self
._mount
.run_shell(["mkdir", "-p", "grandmother/parent"])
243 self
._mount
.write_n_mb("grandmother/parent/orig_pos_file", 1)
244 self
._filesystem
.mds_asok(["flush", "journal"])
245 self
._mount
.run_shell(["mkdir", "grandfather"])
246 self
._mount
.run_shell(["mv", "grandmother/parent", "grandfather"])
247 self
._mount
.write_n_mb("grandfather/parent/new_pos_file", 2)
248 self
._filesystem
.mds_asok(["flush", "journal"])
250 self
._initial
_state
= (
251 self
._mount
.stat("grandfather/parent/orig_pos_file"),
252 self
._mount
.stat("grandfather/parent/new_pos_file")
256 root_files
= self
._mount
.ls()
257 self
.assert_equal(len(root_files
), 1)
258 self
.assert_equal(root_files
[0] in ["grandfather", "grandmother"], True)
259 winner
= root_files
[0]
260 st_opf
= self
._mount
.stat("{0}/parent/orig_pos_file".format(winner
))
261 st_npf
= self
._mount
.stat("{0}/parent/new_pos_file".format(winner
))
263 self
.assert_equal(st_opf
['st_size'], self
._initial
_state
[0]['st_size'])
264 self
.assert_equal(st_npf
['st_size'], self
._initial
_state
[1]['st_size'])
267 class MissingZerothObject(Workload
):
269 self
._mount
.run_shell(["mkdir", "subdir"])
270 self
._mount
.write_n_mb("subdir/sixmegs", 6)
271 self
._initial
_state
= self
._mount
.stat("subdir/sixmegs")
274 super(MissingZerothObject
, self
).damage()
275 zeroth_id
= "{0:x}.00000000".format(self
._initial
_state
['st_ino'])
276 self
._filesystem
.rados(["rm", zeroth_id
], pool
=self
._filesystem
.get_data_pool_name())
279 st
= self
._mount
.stat("lost+found/{0:x}".format(self
._initial
_state
['st_ino']))
280 self
.assert_equal(st
['st_size'], self
._initial
_state
['st_size'])
283 class NonDefaultLayout(Workload
):
285 Check that the reconstruction copes with files that have a different
286 object size in their layout
289 self
._mount
.run_shell(["touch", "datafile"])
290 self
._mount
.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
291 self
._mount
.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
292 self
._initial
_state
= self
._mount
.stat("datafile")
295 # Check we got the layout reconstructed properly
296 object_size
= int(self
._mount
.getfattr(
297 "./datafile", "ceph.file.layout.object_size"))
298 self
.assert_equal(object_size
, 8388608)
300 # Check we got the file size reconstructed properly
301 st
= self
._mount
.stat("datafile")
302 self
.assert_equal(st
['st_size'], self
._initial
_state
['st_size'])
305 class TestDataScan(CephFSTestCase
):
308 def is_marked_damaged(self
, rank
):
309 mds_map
= self
.fs
.get_mds_map()
310 return rank
in mds_map
['damaged']
312 def _rebuild_metadata(self
, workload
, other_pool
=None, workers
=1):
314 That when all objects in metadata pool are removed, we can rebuild a metadata pool
315 based on the contents of a data pool, and a client can see and read our files.
318 # First, inject some files
320 other_fs
= other_pool
+ '-fs' if other_pool
else None
323 # Unmount the client and flush the journal: the tool should also cope with
324 # situations where there is dirty metadata, but we'll test that separately
325 self
.mount_a
.umount_wait()
328 # Create the alternate pool if requested
330 self
.fs
.rados(['mkpool', other_pool
])
331 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
332 'enable_multiple', 'true',
333 '--yes-i-really-mean-it')
334 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'new', other_fs
,
336 self
.fs
.get_data_pool_name(),
337 '--allow-dangerous-metadata-overlay')
338 self
.fs
.data_scan(['init', '--force-init', '--filesystem',
339 other_fs
, '--alternate-pool', other_pool
])
340 self
.fs
.mon_manager
.raw_cluster_cmd('-s')
341 self
.fs
.table_tool([other_fs
+ ":0", "reset", "session"])
342 self
.fs
.table_tool([other_fs
+ ":0", "reset", "snap"])
343 self
.fs
.table_tool([other_fs
+ ":0", "reset", "inode"])
349 # After recovery, we need the MDS to not be strict about stats (in production these options
350 # are off by default, but in QA we need to explicitly disable them)
351 self
.fs
.set_ceph_conf('mds', 'mds verify scatter', False)
352 self
.fs
.set_ceph_conf('mds', 'mds debug scatterstat', False)
354 # Apply any data damage the workload wants
357 # Reset the MDS map in case multiple ranks were in play: recovery procedure
358 # only understands how to rebuild metadata under rank 0
359 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'reset', self
.fs
.name
,
360 '--yes-i-really-mean-it')
362 if other_pool
is None:
363 self
.fs
.mds_restart()
365 def get_state(mds_id
):
366 info
= self
.mds_cluster
.get_mds_info(mds_id
)
367 return info
['state'] if info
is not None else None
369 if other_pool
is None:
370 self
.wait_until_true(lambda: self
.is_marked_damaged(0), 60)
371 for mds_id
in self
.fs
.mds_ids
:
372 self
.wait_until_equal(
373 lambda: get_state(mds_id
),
377 self
.fs
.table_tool([self
.fs
.name
+ ":0", "reset", "session"])
378 self
.fs
.table_tool([self
.fs
.name
+ ":0", "reset", "snap"])
379 self
.fs
.table_tool([self
.fs
.name
+ ":0", "reset", "inode"])
381 # Run the recovery procedure
383 with self
.assertRaises(CommandFailedError
):
384 # Normal reset should fail when no objects are present, we'll use --force instead
385 self
.fs
.journal_tool(["journal", "reset"])
389 self
.fs
.data_scan(['scan_extents', '--alternate-pool',
390 other_pool
, '--filesystem', self
.fs
.name
,
391 self
.fs
.get_data_pool_name()])
392 self
.fs
.data_scan(['scan_inodes', '--alternate-pool',
393 other_pool
, '--filesystem', self
.fs
.name
,
394 '--force-corrupt', '--force-init',
395 self
.fs
.get_data_pool_name()])
396 self
.fs
.journal_tool(['--rank=' + self
.fs
.name
+ ":0", 'event',
397 'recover_dentries', 'list',
398 '--alternate-pool', other_pool
])
400 self
.fs
.data_scan(['init', '--force-init', '--filesystem',
402 self
.fs
.data_scan(['scan_inodes', '--filesystem', self
.fs
.name
,
403 '--force-corrupt', '--force-init',
404 self
.fs
.get_data_pool_name()])
405 self
.fs
.journal_tool(['--rank=' + self
.fs
.name
+ ":0", 'event',
406 'recover_dentries', 'list'])
408 self
.fs
.journal_tool(['--rank=' + other_fs
+ ":0", 'journal',
410 self
.fs
.journal_tool(['--rank=' + self
.fs
.name
+ ":0", 'journal',
412 self
.fs
.mon_manager
.raw_cluster_cmd('mds', 'repaired',
415 self
.fs
.journal_tool(["journal", "reset", "--force"])
416 self
.fs
.data_scan(["init"])
417 self
.fs
.data_scan(["scan_extents", self
.fs
.get_data_pool_name()], worker_count
=workers
)
418 self
.fs
.data_scan(["scan_inodes", self
.fs
.get_data_pool_name()], worker_count
=workers
)
420 # Mark the MDS repaired
421 self
.fs
.mon_manager
.raw_cluster_cmd('mds', 'repaired', '0')
424 self
.fs
.mds_restart()
425 self
.fs
.wait_for_daemons()
427 for mds_id
in self
.fs
.mds_ids
:
428 self
.fs
.mon_manager
.raw_cluster_cmd('tell', "mds." + mds_id
,
429 'injectargs', '--debug-mds=20')
430 self
.fs
.mon_manager
.raw_cluster_cmd('daemon', "mds." + mds_id
,
432 'recursive', 'repair')
433 log
.info(str(self
.mds_cluster
.status()))
436 self
.mount_a
.mount(mount_fs_name
=other_fs
)
437 self
.mount_a
.wait_until_mounted()
439 # See that the files are present and correct
440 errors
= workload
.validate()
442 log
.error("Validation errors found: {0}".format(len(errors
)))
444 log
.error(e
.exception
)
445 log
.error(e
.backtrace
)
446 raise AssertionError("Validation failed, first error: {0}\n{1}".format(
447 errors
[0].exception
, errors
[0].backtrace
450 def test_rebuild_simple(self
):
451 self
._rebuild
_metadata
(SimpleWorkload(self
.fs
, self
.mount_a
))
453 def test_rebuild_moved_file(self
):
454 self
._rebuild
_metadata
(MovedFile(self
.fs
, self
.mount_a
))
456 def test_rebuild_backtraceless(self
):
457 self
._rebuild
_metadata
(BacktracelessFile(self
.fs
, self
.mount_a
))
459 def test_rebuild_moved_dir(self
):
460 self
._rebuild
_metadata
(MovedDir(self
.fs
, self
.mount_a
))
462 def test_rebuild_missing_zeroth(self
):
463 self
._rebuild
_metadata
(MissingZerothObject(self
.fs
, self
.mount_a
))
465 def test_rebuild_nondefault_layout(self
):
466 self
._rebuild
_metadata
(NonDefaultLayout(self
.fs
, self
.mount_a
))
468 def test_stashed_layout(self
):
469 self
._rebuild
_metadata
(StripedStashedLayout(self
.fs
, self
.mount_a
))
471 def test_rebuild_simple_altpool(self
):
472 self
._rebuild
_metadata
(SimpleWorkload(self
.fs
, self
.mount_a
), other_pool
="recovery")
474 def _dirfrag_keys(self
, object_id
):
475 self
.other_pool
= 'recovery'
476 self
.other_fs
= self
.other_pool
+ '-fs'
477 keys_str
= self
.fs
.rados(["listomapkeys", object_id
])
479 return keys_str
.split("\n")
483 def test_fragmented_injection(self
):
485 That when injecting a dentry into a fragmented directory, we put it in the right fragment.
488 self
.fs
.set_allow_dirfrags(True)
491 file_names
= ["%s" % n
for n
in range(0, file_count
)]
493 # Create a directory of `file_count` files, each named after its
494 # decimal number and containing the string of its decimal number
495 self
.mount_a
.run_python(dedent("""
497 path = os.path.join("{path}", "subdir")
499 for n in range(0, {file_count}):
500 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
502 path
=self
.mount_a
.mountpoint
,
503 file_count
=file_count
506 dir_ino
= self
.mount_a
.path_to_ino("subdir")
508 # Only one MDS should be active!
509 self
.assertEqual(len(self
.fs
.get_active_names()), 1)
511 # Ensure that one directory is fragmented
512 mds_id
= self
.fs
.get_active_names()[0]
513 self
.fs
.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id
)
515 # Flush journal and stop MDS
516 self
.mount_a
.umount_wait()
517 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
521 # Pick a dentry and wipe out its key
522 # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
523 frag_obj_id
= "{0:x}.01000000".format(dir_ino
)
524 keys
= self
._dirfrag
_keys
(frag_obj_id
)
525 victim_key
= keys
[7] # arbitrary choice
526 log
.info("victim_key={0}".format(victim_key
))
527 victim_dentry
= victim_key
.split("_head")[0]
528 self
.fs
.rados(["rmomapkey", frag_obj_id
, victim_key
])
530 # Start filesystem back up, observe that the file appears to be gone in an `ls`
531 self
.fs
.mds_restart()
532 self
.fs
.wait_for_daemons()
534 self
.mount_a
.wait_until_mounted()
535 files
= self
.mount_a
.run_shell(["ls", "subdir/"]).stdout
.getvalue().strip().split("\n")
536 self
.assertListEqual(sorted(files
), sorted(list(set(file_names
) - set([victim_dentry
]))))
538 # Stop the filesystem
539 self
.mount_a
.umount_wait()
543 # Run data-scan, observe that it inserts our dentry back into the correct fragment
544 # by checking the omap now has the dentry's key again
545 self
.fs
.data_scan(["scan_extents", self
.fs
.get_data_pool_name()])
546 self
.fs
.data_scan(["scan_inodes", self
.fs
.get_data_pool_name()])
547 self
.assertIn(victim_key
, self
._dirfrag
_keys
(frag_obj_id
))
549 # Start the filesystem and check that the dentry we deleted is now once again visible
550 # and points to the correct file data.
551 self
.fs
.mds_restart()
552 self
.fs
.wait_for_daemons()
554 self
.mount_a
.wait_until_mounted()
555 out
= self
.mount_a
.run_shell(["cat", "subdir/{0}".format(victim_dentry
)]).stdout
.getvalue().strip()
556 self
.assertEqual(out
, victim_dentry
)
558 # Finally, close the loop by checking our injected dentry survives a merge
559 mds_id
= self
.fs
.get_active_names()[0]
560 self
.mount_a
.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work
561 self
.fs
.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id
)
562 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
563 frag_obj_id
= "{0:x}.00000000".format(dir_ino
)
564 keys
= self
._dirfrag
_keys
(frag_obj_id
)
565 self
.assertListEqual(sorted(keys
), sorted(["%s_head" % f
for f
in file_names
]))
568 def test_parallel_execution(self
):
569 self
._rebuild
_metadata
(ManyFilesWorkload(self
.fs
, self
.mount_a
, 25), workers
=7)
571 def test_pg_files(self
):
573 That the pg files command tells us which files are associated with
577 self
.mount_a
.run_shell(["mkdir", "mydir"])
578 self
.mount_a
.create_n_files("mydir/myfile", file_count
)
580 # Some files elsewhere in the system that we will ignore
581 # to check that the tool is filtering properly
582 self
.mount_a
.run_shell(["mkdir", "otherdir"])
583 self
.mount_a
.create_n_files("otherdir/otherfile", file_count
)
585 pgs_to_files
= defaultdict(list)
586 # Rough (slow) reimplementation of the logic
587 for i
in range(0, file_count
):
588 file_path
= "mydir/myfile_{0}".format(i
)
589 ino
= self
.mount_a
.path_to_ino(file_path
)
590 obj
= "{0:x}.{1:08x}".format(ino
, 0)
591 pgid
= json
.loads(self
.fs
.mon_manager
.raw_cluster_cmd(
592 "osd", "map", self
.fs
.get_data_pool_name(), obj
,
593 "--format=json-pretty"
595 pgs_to_files
[pgid
].append(file_path
)
596 log
.info("{0}: {1}".format(file_path
, pgid
))
598 pg_count
= self
.fs
.get_pgs_per_fs_pool()
599 for pg_n
in range(0, pg_count
):
600 pg_str
= "{0}.{1}".format(self
.fs
.get_data_pool_id(), pg_n
)
601 out
= self
.fs
.data_scan(["pg_files", "mydir", pg_str
])
602 lines
= [l
for l
in out
.split("\n") if l
]
603 log
.info("{0}: {1}".format(pg_str
, lines
))
604 self
.assertSetEqual(set(lines
), set(pgs_to_files
[pg_str
]))
606 def test_scan_links(self
):
608 The scan_links command fixes linkage errors
610 self
.mount_a
.run_shell(["mkdir", "testdir1"])
611 self
.mount_a
.run_shell(["mkdir", "testdir2"])
612 dir1_ino
= self
.mount_a
.path_to_ino("testdir1")
613 dir2_ino
= self
.mount_a
.path_to_ino("testdir2")
614 dirfrag1_oid
= "{0:x}.00000000".format(dir1_ino
)
615 dirfrag2_oid
= "{0:x}.00000000".format(dir2_ino
)
617 self
.mount_a
.run_shell(["touch", "testdir1/file1"])
618 self
.mount_a
.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
619 self
.mount_a
.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
621 mds_id
= self
.fs
.get_active_names()[0]
622 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
624 dirfrag1_keys
= self
._dirfrag
_keys
(dirfrag1_oid
)
626 # introduce duplicated primary link
627 file1_key
= "file1_head"
628 self
.assertIn(file1_key
, dirfrag1_keys
)
629 file1_omap_data
= self
.fs
.rados(["getomapval", dirfrag1_oid
, file1_key
, '-'])
630 self
.fs
.rados(["setomapval", dirfrag2_oid
, file1_key
], stdin_data
=file1_omap_data
)
631 self
.assertIn(file1_key
, self
._dirfrag
_keys
(dirfrag2_oid
))
633 # remove a remote link, make inode link count incorrect
634 link1_key
= 'link1_head'
635 self
.assertIn(link1_key
, dirfrag1_keys
)
636 self
.fs
.rados(["rmomapkey", dirfrag1_oid
, link1_key
])
638 # increase good primary link's version
639 self
.mount_a
.run_shell(["touch", "testdir1/file1"])
640 self
.mount_a
.umount_wait()
642 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
646 # repair linkage errors
647 self
.fs
.data_scan(["scan_links"])
649 # primary link in testdir2 was deleted?
650 self
.assertNotIn(file1_key
, self
._dirfrag
_keys
(dirfrag2_oid
))
652 self
.fs
.mds_restart()
653 self
.fs
.wait_for_daemons()
656 self
.mount_a
.wait_until_mounted()
658 # link count was adjusted?
659 file1_nlink
= self
.mount_a
.path_to_nlink("testdir1/file1")
660 self
.assertEqual(file1_nlink
, 2)