]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_data_scan.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / cephfs / test_data_scan.py
1
2 """
3 Test our tools for recovering metadata from the data pool
4 """
5 import json
6
7 import logging
8 import os
9 from textwrap import dedent
10 import traceback
11 from collections import namedtuple, defaultdict
12
13 from teuthology.orchestra.run import CommandFailedError
14 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
15
16 log = logging.getLogger(__name__)
17
18
19 ValidationError = namedtuple("ValidationError", ["exception", "backtrace"])
20
21
22 class Workload(object):
23 def __init__(self, filesystem, mount):
24 self._mount = mount
25 self._filesystem = filesystem
26 self._initial_state = None
27
28 # Accumulate backtraces for every failed validation, and return them. Backtraces
29 # are rather verbose, but we only see them when something breaks, and they
30 # let us see which check failed without having to decorate each check with
31 # a string
32 self._errors = []
33
34 def assert_equal(self, a, b):
35 try:
36 if a != b:
37 raise AssertionError("{0} != {1}".format(a, b))
38 except AssertionError as e:
39 self._errors.append(
40 ValidationError(e, traceback.format_exc(3))
41 )
42
43 def write(self):
44 """
45 Write the workload files to the mount
46 """
47 raise NotImplementedError()
48
49 def validate(self):
50 """
51 Read from the mount and validate that the workload files are present (i.e. have
52 survived or been reconstructed from the test scenario)
53 """
54 raise NotImplementedError()
55
56 def damage(self):
57 """
58 Damage the filesystem pools in ways that will be interesting to recover from. By
59 default just wipe everything in the metadata pool
60 """
61 # Delete every object in the metadata pool
62 objects = self._filesystem.rados(["ls"]).split("\n")
63 for o in objects:
64 self._filesystem.rados(["rm", o])
65
66 def flush(self):
67 """
68 Called after client unmount, after write: flush whatever you want
69 """
70 self._filesystem.mds_asok(["flush", "journal"])
71
72
73 class SimpleWorkload(Workload):
74 """
75 Single file, single directory, check that it gets recovered and so does its size
76 """
77 def write(self):
78 self._mount.run_shell(["mkdir", "subdir"])
79 self._mount.write_n_mb("subdir/sixmegs", 6)
80 self._initial_state = self._mount.stat("subdir/sixmegs")
81
82 def validate(self):
83 self._mount.run_shell(["ls", "subdir"])
84 st = self._mount.stat("subdir/sixmegs")
85 self.assert_equal(st['st_size'], self._initial_state['st_size'])
86 return self._errors
87
88
89 class MovedFile(Workload):
90 def write(self):
91 # Create a file whose backtrace disagrees with his eventual position
92 # in the metadata. We will see that he gets reconstructed in his
93 # original position according to his backtrace.
94 self._mount.run_shell(["mkdir", "subdir_alpha"])
95 self._mount.run_shell(["mkdir", "subdir_bravo"])
96 self._mount.write_n_mb("subdir_alpha/sixmegs", 6)
97 self._filesystem.mds_asok(["flush", "journal"])
98 self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"])
99 self._initial_state = self._mount.stat("subdir_bravo/sixmegs")
100
101 def flush(self):
102 pass
103
104 def validate(self):
105 self.assert_equal(self._mount.ls(), ["subdir_alpha"])
106 st = self._mount.stat("subdir_alpha/sixmegs")
107 self.assert_equal(st['st_size'], self._initial_state['st_size'])
108 return self._errors
109
110
111 class BacktracelessFile(Workload):
112 def write(self):
113 self._mount.run_shell(["mkdir", "subdir"])
114 self._mount.write_n_mb("subdir/sixmegs", 6)
115 self._initial_state = self._mount.stat("subdir/sixmegs")
116
117 def flush(self):
118 # Never flush metadata, so backtrace won't be written
119 pass
120
121 def validate(self):
122 ino_name = "%x" % self._initial_state["st_ino"]
123
124 # The inode should be linked into lost+found because we had no path for it
125 self.assert_equal(self._mount.ls(), ["lost+found"])
126 self.assert_equal(self._mount.ls("lost+found"), [ino_name])
127 st = self._mount.stat("lost+found/{ino_name}".format(ino_name=ino_name))
128
129 # We might not have got the name or path, but we should still get the size
130 self.assert_equal(st['st_size'], self._initial_state['st_size'])
131
132 return self._errors
133
134
135 class StripedStashedLayout(Workload):
136 def __init__(self, fs, m):
137 super(StripedStashedLayout, self).__init__(fs, m)
138
139 # Nice small stripes so we can quickly do our writes+validates
140 self.sc = 4
141 self.ss = 65536
142 self.os = 262144
143
144 self.interesting_sizes = [
145 # Exactly stripe_count objects will exist
146 self.os * self.sc,
147 # Fewer than stripe_count objects will exist
148 self.os * self.sc / 2,
149 self.os * (self.sc - 1) + self.os / 2,
150 self.os * (self.sc - 1) + self.os / 2 - 1,
151 self.os * (self.sc + 1) + self.os / 2,
152 self.os * (self.sc + 1) + self.os / 2 + 1,
153 # More than stripe_count objects will exist
154 self.os * self.sc + self.os * self.sc / 2
155 ]
156
157 def write(self):
158 # Create a dir with a striped layout set on it
159 self._mount.run_shell(["mkdir", "stripey"])
160
161 self._mount.setfattr("./stripey", "ceph.dir.layout",
162 "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format(
163 ss=self.ss, os=self.os, sc=self.sc,
164 pool=self._filesystem.get_data_pool_name()
165 ))
166
167 # Write files, then flush metadata so that its layout gets written into an xattr
168 for i, n_bytes in enumerate(self.interesting_sizes):
169 self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
170 # This is really just validating the validator
171 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
172 self._filesystem.mds_asok(["flush", "journal"])
173
174 # Write another file in the same way, but this time don't flush the metadata,
175 # so that it won't have the layout xattr
176 self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512)
177 self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512)
178
179 self._initial_state = {
180 "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file")
181 }
182
183 def flush(self):
184 # Pass because we already selectively flushed during write
185 pass
186
187 def validate(self):
188 # The first files should have been recovered into its original location
189 # with the correct layout: read back correct data
190 for i, n_bytes in enumerate(self.interesting_sizes):
191 try:
192 self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes)
193 except CommandFailedError as e:
194 self._errors.append(
195 ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3))
196 )
197
198 # The unflushed file should have been recovered into lost+found without
199 # the correct layout: read back junk
200 ino_name = "%x" % self._initial_state["unflushed_ino"]
201 self.assert_equal(self._mount.ls("lost+found"), [ino_name])
202 try:
203 self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512)
204 except CommandFailedError:
205 pass
206 else:
207 self._errors.append(
208 ValidationError("Unexpectedly valid data in unflushed striped file", "")
209 )
210
211 return self._errors
212
213
214 class ManyFilesWorkload(Workload):
215 def __init__(self, filesystem, mount, file_count):
216 super(ManyFilesWorkload, self).__init__(filesystem, mount)
217 self.file_count = file_count
218
219 def write(self):
220 self._mount.run_shell(["mkdir", "subdir"])
221 for n in range(0, self.file_count):
222 self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
223
224 def validate(self):
225 for n in range(0, self.file_count):
226 try:
227 self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024)
228 except CommandFailedError as e:
229 self._errors.append(
230 ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3))
231 )
232
233 return self._errors
234
235
236 class MovedDir(Workload):
237 def write(self):
238 # Create a nested dir that we will then move. Two files with two different
239 # backtraces referring to the moved dir, claiming two different locations for
240 # it. We will see that only one backtrace wins and the dir ends up with
241 # single linkage.
242 self._mount.run_shell(["mkdir", "-p", "grandmother/parent"])
243 self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1)
244 self._filesystem.mds_asok(["flush", "journal"])
245 self._mount.run_shell(["mkdir", "grandfather"])
246 self._mount.run_shell(["mv", "grandmother/parent", "grandfather"])
247 self._mount.write_n_mb("grandfather/parent/new_pos_file", 2)
248 self._filesystem.mds_asok(["flush", "journal"])
249
250 self._initial_state = (
251 self._mount.stat("grandfather/parent/orig_pos_file"),
252 self._mount.stat("grandfather/parent/new_pos_file")
253 )
254
255 def validate(self):
256 root_files = self._mount.ls()
257 self.assert_equal(len(root_files), 1)
258 self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True)
259 winner = root_files[0]
260 st_opf = self._mount.stat("{0}/parent/orig_pos_file".format(winner))
261 st_npf = self._mount.stat("{0}/parent/new_pos_file".format(winner))
262
263 self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size'])
264 self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size'])
265
266
267 class MissingZerothObject(Workload):
268 def write(self):
269 self._mount.run_shell(["mkdir", "subdir"])
270 self._mount.write_n_mb("subdir/sixmegs", 6)
271 self._initial_state = self._mount.stat("subdir/sixmegs")
272
273 def damage(self):
274 super(MissingZerothObject, self).damage()
275 zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino'])
276 self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name())
277
278 def validate(self):
279 st = self._mount.stat("lost+found/{0:x}".format(self._initial_state['st_ino']))
280 self.assert_equal(st['st_size'], self._initial_state['st_size'])
281
282
283 class NonDefaultLayout(Workload):
284 """
285 Check that the reconstruction copes with files that have a different
286 object size in their layout
287 """
288 def write(self):
289 self._mount.run_shell(["touch", "datafile"])
290 self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608")
291 self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"])
292 self._initial_state = self._mount.stat("datafile")
293
294 def validate(self):
295 # Check we got the layout reconstructed properly
296 object_size = int(self._mount.getfattr(
297 "./datafile", "ceph.file.layout.object_size"))
298 self.assert_equal(object_size, 8388608)
299
300 # Check we got the file size reconstructed properly
301 st = self._mount.stat("datafile")
302 self.assert_equal(st['st_size'], self._initial_state['st_size'])
303
304
305 class TestDataScan(CephFSTestCase):
306 MDSS_REQUIRED = 2
307
308 def is_marked_damaged(self, rank):
309 mds_map = self.fs.get_mds_map()
310 return rank in mds_map['damaged']
311
312 def _rebuild_metadata(self, workload, other_pool=None, workers=1):
313 """
314 That when all objects in metadata pool are removed, we can rebuild a metadata pool
315 based on the contents of a data pool, and a client can see and read our files.
316 """
317
318 # First, inject some files
319
320 other_fs = other_pool + '-fs' if other_pool else None
321 workload.write()
322
323 # Unmount the client and flush the journal: the tool should also cope with
324 # situations where there is dirty metadata, but we'll test that separately
325 self.mount_a.umount_wait()
326 workload.flush()
327
328 # Create the alternate pool if requested
329 if other_pool:
330 self.fs.rados(['mkpool', other_pool])
331 self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
332 'enable_multiple', 'true',
333 '--yes-i-really-mean-it')
334 self.fs.mon_manager.raw_cluster_cmd('fs', 'new', other_fs,
335 other_pool,
336 self.fs.get_data_pool_name(),
337 '--allow-dangerous-metadata-overlay')
338 self.fs.data_scan(['init', '--force-init', '--filesystem',
339 other_fs, '--alternate-pool', other_pool])
340 self.fs.mon_manager.raw_cluster_cmd('-s')
341 self.fs.table_tool([other_fs + ":0", "reset", "session"])
342 self.fs.table_tool([other_fs + ":0", "reset", "snap"])
343 self.fs.table_tool([other_fs + ":0", "reset", "inode"])
344
345 # Stop the MDS
346 self.fs.mds_stop()
347 self.fs.mds_fail()
348
349 # After recovery, we need the MDS to not be strict about stats (in production these options
350 # are off by default, but in QA we need to explicitly disable them)
351 self.fs.set_ceph_conf('mds', 'mds verify scatter', False)
352 self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False)
353
354 # Apply any data damage the workload wants
355 workload.damage()
356
357 # Reset the MDS map in case multiple ranks were in play: recovery procedure
358 # only understands how to rebuild metadata under rank 0
359 self.fs.mon_manager.raw_cluster_cmd('fs', 'reset', self.fs.name,
360 '--yes-i-really-mean-it')
361
362 if other_pool is None:
363 self.fs.mds_restart()
364
365 def get_state(mds_id):
366 info = self.mds_cluster.get_mds_info(mds_id)
367 return info['state'] if info is not None else None
368
369 if other_pool is None:
370 self.wait_until_true(lambda: self.is_marked_damaged(0), 60)
371 for mds_id in self.fs.mds_ids:
372 self.wait_until_equal(
373 lambda: get_state(mds_id),
374 "up:standby",
375 timeout=60)
376
377 self.fs.table_tool([self.fs.name + ":0", "reset", "session"])
378 self.fs.table_tool([self.fs.name + ":0", "reset", "snap"])
379 self.fs.table_tool([self.fs.name + ":0", "reset", "inode"])
380
381 # Run the recovery procedure
382 if False:
383 with self.assertRaises(CommandFailedError):
384 # Normal reset should fail when no objects are present, we'll use --force instead
385 self.fs.journal_tool(["journal", "reset"])
386
387 if other_pool:
388 self.fs.mds_stop()
389 self.fs.data_scan(['scan_extents', '--alternate-pool',
390 other_pool, '--filesystem', self.fs.name,
391 self.fs.get_data_pool_name()])
392 self.fs.data_scan(['scan_inodes', '--alternate-pool',
393 other_pool, '--filesystem', self.fs.name,
394 '--force-corrupt', '--force-init',
395 self.fs.get_data_pool_name()])
396 self.fs.journal_tool(['--rank=' + self.fs.name + ":0", 'event',
397 'recover_dentries', 'list',
398 '--alternate-pool', other_pool])
399
400 self.fs.data_scan(['init', '--force-init', '--filesystem',
401 self.fs.name])
402 self.fs.data_scan(['scan_inodes', '--filesystem', self.fs.name,
403 '--force-corrupt', '--force-init',
404 self.fs.get_data_pool_name()])
405 self.fs.journal_tool(['--rank=' + self.fs.name + ":0", 'event',
406 'recover_dentries', 'list'])
407
408 self.fs.journal_tool(['--rank=' + other_fs + ":0", 'journal',
409 'reset', '--force'])
410 self.fs.journal_tool(['--rank=' + self.fs.name + ":0", 'journal',
411 'reset', '--force'])
412 self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired',
413 other_fs + ":0")
414 else:
415 self.fs.journal_tool(["journal", "reset", "--force"])
416 self.fs.data_scan(["init"])
417 self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers)
418 self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers)
419
420 # Mark the MDS repaired
421 self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0')
422
423 # Start the MDS
424 self.fs.mds_restart()
425 self.fs.wait_for_daemons()
426 if other_pool:
427 for mds_id in self.fs.mds_ids:
428 self.fs.mon_manager.raw_cluster_cmd('tell', "mds." + mds_id,
429 'injectargs', '--debug-mds=20')
430 self.fs.mon_manager.raw_cluster_cmd('daemon', "mds." + mds_id,
431 'scrub_path', '/',
432 'recursive', 'repair')
433 log.info(str(self.mds_cluster.status()))
434
435 # Mount a client
436 self.mount_a.mount(mount_fs_name=other_fs)
437 self.mount_a.wait_until_mounted()
438
439 # See that the files are present and correct
440 errors = workload.validate()
441 if errors:
442 log.error("Validation errors found: {0}".format(len(errors)))
443 for e in errors:
444 log.error(e.exception)
445 log.error(e.backtrace)
446 raise AssertionError("Validation failed, first error: {0}\n{1}".format(
447 errors[0].exception, errors[0].backtrace
448 ))
449
450 def test_rebuild_simple(self):
451 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a))
452
453 def test_rebuild_moved_file(self):
454 self._rebuild_metadata(MovedFile(self.fs, self.mount_a))
455
456 def test_rebuild_backtraceless(self):
457 self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a))
458
459 def test_rebuild_moved_dir(self):
460 self._rebuild_metadata(MovedDir(self.fs, self.mount_a))
461
462 def test_rebuild_missing_zeroth(self):
463 self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a))
464
465 def test_rebuild_nondefault_layout(self):
466 self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a))
467
468 def test_stashed_layout(self):
469 self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a))
470
471 def test_rebuild_simple_altpool(self):
472 self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a), other_pool="recovery")
473
474 def _dirfrag_keys(self, object_id):
475 self.other_pool = 'recovery'
476 self.other_fs = self.other_pool + '-fs'
477 keys_str = self.fs.rados(["listomapkeys", object_id])
478 if keys_str:
479 return keys_str.split("\n")
480 else:
481 return []
482
483 def test_fragmented_injection(self):
484 """
485 That when injecting a dentry into a fragmented directory, we put it in the right fragment.
486 """
487
488 self.fs.set_allow_dirfrags(True)
489
490 file_count = 100
491 file_names = ["%s" % n for n in range(0, file_count)]
492
493 # Create a directory of `file_count` files, each named after its
494 # decimal number and containing the string of its decimal number
495 self.mount_a.run_python(dedent("""
496 import os
497 path = os.path.join("{path}", "subdir")
498 os.mkdir(path)
499 for n in range(0, {file_count}):
500 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
501 """.format(
502 path=self.mount_a.mountpoint,
503 file_count=file_count
504 )))
505
506 dir_ino = self.mount_a.path_to_ino("subdir")
507
508 # Only one MDS should be active!
509 self.assertEqual(len(self.fs.get_active_names()), 1)
510
511 # Ensure that one directory is fragmented
512 mds_id = self.fs.get_active_names()[0]
513 self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id)
514
515 # Flush journal and stop MDS
516 self.mount_a.umount_wait()
517 self.fs.mds_asok(["flush", "journal"], mds_id)
518 self.fs.mds_stop()
519 self.fs.mds_fail()
520
521 # Pick a dentry and wipe out its key
522 # Because I did a 1 bit split, I know one frag will be named <inode>.01000000
523 frag_obj_id = "{0:x}.01000000".format(dir_ino)
524 keys = self._dirfrag_keys(frag_obj_id)
525 victim_key = keys[7] # arbitrary choice
526 log.info("victim_key={0}".format(victim_key))
527 victim_dentry = victim_key.split("_head")[0]
528 self.fs.rados(["rmomapkey", frag_obj_id, victim_key])
529
530 # Start filesystem back up, observe that the file appears to be gone in an `ls`
531 self.fs.mds_restart()
532 self.fs.wait_for_daemons()
533 self.mount_a.mount()
534 self.mount_a.wait_until_mounted()
535 files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n")
536 self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry]))))
537
538 # Stop the filesystem
539 self.mount_a.umount_wait()
540 self.fs.mds_stop()
541 self.fs.mds_fail()
542
543 # Run data-scan, observe that it inserts our dentry back into the correct fragment
544 # by checking the omap now has the dentry's key again
545 self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()])
546 self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()])
547 self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id))
548
549 # Start the filesystem and check that the dentry we deleted is now once again visible
550 # and points to the correct file data.
551 self.fs.mds_restart()
552 self.fs.wait_for_daemons()
553 self.mount_a.mount()
554 self.mount_a.wait_until_mounted()
555 out = self.mount_a.run_shell(["cat", "subdir/{0}".format(victim_dentry)]).stdout.getvalue().strip()
556 self.assertEqual(out, victim_dentry)
557
558 # Finally, close the loop by checking our injected dentry survives a merge
559 mds_id = self.fs.get_active_names()[0]
560 self.mount_a.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work
561 self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id)
562 self.fs.mds_asok(["flush", "journal"], mds_id)
563 frag_obj_id = "{0:x}.00000000".format(dir_ino)
564 keys = self._dirfrag_keys(frag_obj_id)
565 self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names]))
566
567 @for_teuthology
568 def test_parallel_execution(self):
569 self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7)
570
571 def test_pg_files(self):
572 """
573 That the pg files command tells us which files are associated with
574 a particular PG
575 """
576 file_count = 20
577 self.mount_a.run_shell(["mkdir", "mydir"])
578 self.mount_a.create_n_files("mydir/myfile", file_count)
579
580 # Some files elsewhere in the system that we will ignore
581 # to check that the tool is filtering properly
582 self.mount_a.run_shell(["mkdir", "otherdir"])
583 self.mount_a.create_n_files("otherdir/otherfile", file_count)
584
585 pgs_to_files = defaultdict(list)
586 # Rough (slow) reimplementation of the logic
587 for i in range(0, file_count):
588 file_path = "mydir/myfile_{0}".format(i)
589 ino = self.mount_a.path_to_ino(file_path)
590 obj = "{0:x}.{1:08x}".format(ino, 0)
591 pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd(
592 "osd", "map", self.fs.get_data_pool_name(), obj,
593 "--format=json-pretty"
594 ))['pgid']
595 pgs_to_files[pgid].append(file_path)
596 log.info("{0}: {1}".format(file_path, pgid))
597
598 pg_count = self.fs.get_pgs_per_fs_pool()
599 for pg_n in range(0, pg_count):
600 pg_str = "{0}.{1}".format(self.fs.get_data_pool_id(), pg_n)
601 out = self.fs.data_scan(["pg_files", "mydir", pg_str])
602 lines = [l for l in out.split("\n") if l]
603 log.info("{0}: {1}".format(pg_str, lines))
604 self.assertSetEqual(set(lines), set(pgs_to_files[pg_str]))
605
606 def test_scan_links(self):
607 """
608 The scan_links command fixes linkage errors
609 """
610 self.mount_a.run_shell(["mkdir", "testdir1"])
611 self.mount_a.run_shell(["mkdir", "testdir2"])
612 dir1_ino = self.mount_a.path_to_ino("testdir1")
613 dir2_ino = self.mount_a.path_to_ino("testdir2")
614 dirfrag1_oid = "{0:x}.00000000".format(dir1_ino)
615 dirfrag2_oid = "{0:x}.00000000".format(dir2_ino)
616
617 self.mount_a.run_shell(["touch", "testdir1/file1"])
618 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"])
619 self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"])
620
621 mds_id = self.fs.get_active_names()[0]
622 self.fs.mds_asok(["flush", "journal"], mds_id)
623
624 dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid)
625
626 # introduce duplicated primary link
627 file1_key = "file1_head"
628 self.assertIn(file1_key, dirfrag1_keys)
629 file1_omap_data = self.fs.rados(["getomapval", dirfrag1_oid, file1_key, '-'])
630 self.fs.rados(["setomapval", dirfrag2_oid, file1_key], stdin_data=file1_omap_data)
631 self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
632
633 # remove a remote link, make inode link count incorrect
634 link1_key = 'link1_head'
635 self.assertIn(link1_key, dirfrag1_keys)
636 self.fs.rados(["rmomapkey", dirfrag1_oid, link1_key])
637
638 # increase good primary link's version
639 self.mount_a.run_shell(["touch", "testdir1/file1"])
640 self.mount_a.umount_wait()
641
642 self.fs.mds_asok(["flush", "journal"], mds_id)
643 self.fs.mds_stop()
644 self.fs.mds_fail()
645
646 # repair linkage errors
647 self.fs.data_scan(["scan_links"])
648
649 # primary link in testdir2 was deleted?
650 self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid))
651
652 self.fs.mds_restart()
653 self.fs.wait_for_daemons()
654
655 self.mount_a.mount()
656 self.mount_a.wait_until_mounted()
657
658 # link count was adjusted?
659 file1_nlink = self.mount_a.path_to_nlink("testdir1/file1")
660 self.assertEqual(file1_nlink, 2)