]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | """ | |
3 | Test our tools for recovering metadata from the data pool | |
4 | """ | |
5 | import json | |
6 | ||
7 | import logging | |
8 | import os | |
9 | from textwrap import dedent | |
10 | import traceback | |
11 | from collections import namedtuple, defaultdict | |
12 | ||
13 | from teuthology.orchestra.run import CommandFailedError | |
14 | from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology | |
15 | ||
16 | log = logging.getLogger(__name__) | |
17 | ||
18 | ||
19 | ValidationError = namedtuple("ValidationError", ["exception", "backtrace"]) | |
20 | ||
21 | ||
22 | class Workload(object): | |
23 | def __init__(self, filesystem, mount): | |
24 | self._mount = mount | |
25 | self._filesystem = filesystem | |
26 | self._initial_state = None | |
27 | ||
28 | # Accumulate backtraces for every failed validation, and return them. Backtraces | |
29 | # are rather verbose, but we only see them when something breaks, and they | |
30 | # let us see which check failed without having to decorate each check with | |
31 | # a string | |
32 | self._errors = [] | |
33 | ||
34 | def assert_equal(self, a, b): | |
35 | try: | |
36 | if a != b: | |
37 | raise AssertionError("{0} != {1}".format(a, b)) | |
38 | except AssertionError as e: | |
39 | self._errors.append( | |
40 | ValidationError(e, traceback.format_exc(3)) | |
41 | ) | |
42 | ||
43 | def write(self): | |
44 | """ | |
45 | Write the workload files to the mount | |
46 | """ | |
47 | raise NotImplementedError() | |
48 | ||
49 | def validate(self): | |
50 | """ | |
51 | Read from the mount and validate that the workload files are present (i.e. have | |
52 | survived or been reconstructed from the test scenario) | |
53 | """ | |
54 | raise NotImplementedError() | |
55 | ||
56 | def damage(self): | |
57 | """ | |
58 | Damage the filesystem pools in ways that will be interesting to recover from. By | |
59 | default just wipe everything in the metadata pool | |
60 | """ | |
61 | # Delete every object in the metadata pool | |
62 | objects = self._filesystem.rados(["ls"]).split("\n") | |
63 | for o in objects: | |
64 | self._filesystem.rados(["rm", o]) | |
65 | ||
66 | def flush(self): | |
67 | """ | |
68 | Called after client unmount, after write: flush whatever you want | |
69 | """ | |
70 | self._filesystem.mds_asok(["flush", "journal"]) | |
71 | ||
72 | ||
73 | class SimpleWorkload(Workload): | |
74 | """ | |
75 | Single file, single directory, check that it gets recovered and so does its size | |
76 | """ | |
77 | def write(self): | |
78 | self._mount.run_shell(["mkdir", "subdir"]) | |
79 | self._mount.write_n_mb("subdir/sixmegs", 6) | |
80 | self._initial_state = self._mount.stat("subdir/sixmegs") | |
81 | ||
82 | def validate(self): | |
83 | self._mount.run_shell(["ls", "subdir"]) | |
84 | st = self._mount.stat("subdir/sixmegs") | |
85 | self.assert_equal(st['st_size'], self._initial_state['st_size']) | |
86 | return self._errors | |
87 | ||
88 | ||
89 | class MovedFile(Workload): | |
90 | def write(self): | |
91 | # Create a file whose backtrace disagrees with his eventual position | |
92 | # in the metadata. We will see that he gets reconstructed in his | |
93 | # original position according to his backtrace. | |
94 | self._mount.run_shell(["mkdir", "subdir_alpha"]) | |
95 | self._mount.run_shell(["mkdir", "subdir_bravo"]) | |
96 | self._mount.write_n_mb("subdir_alpha/sixmegs", 6) | |
97 | self._filesystem.mds_asok(["flush", "journal"]) | |
98 | self._mount.run_shell(["mv", "subdir_alpha/sixmegs", "subdir_bravo/sixmegs"]) | |
99 | self._initial_state = self._mount.stat("subdir_bravo/sixmegs") | |
100 | ||
101 | def flush(self): | |
102 | pass | |
103 | ||
104 | def validate(self): | |
105 | self.assert_equal(self._mount.ls(), ["subdir_alpha"]) | |
106 | st = self._mount.stat("subdir_alpha/sixmegs") | |
107 | self.assert_equal(st['st_size'], self._initial_state['st_size']) | |
108 | return self._errors | |
109 | ||
110 | ||
111 | class BacktracelessFile(Workload): | |
112 | def write(self): | |
113 | self._mount.run_shell(["mkdir", "subdir"]) | |
114 | self._mount.write_n_mb("subdir/sixmegs", 6) | |
115 | self._initial_state = self._mount.stat("subdir/sixmegs") | |
116 | ||
117 | def flush(self): | |
118 | # Never flush metadata, so backtrace won't be written | |
119 | pass | |
120 | ||
121 | def validate(self): | |
122 | ino_name = "%x" % self._initial_state["st_ino"] | |
123 | ||
124 | # The inode should be linked into lost+found because we had no path for it | |
125 | self.assert_equal(self._mount.ls(), ["lost+found"]) | |
126 | self.assert_equal(self._mount.ls("lost+found"), [ino_name]) | |
127 | st = self._mount.stat("lost+found/{ino_name}".format(ino_name=ino_name)) | |
128 | ||
129 | # We might not have got the name or path, but we should still get the size | |
130 | self.assert_equal(st['st_size'], self._initial_state['st_size']) | |
131 | ||
132 | return self._errors | |
133 | ||
134 | ||
135 | class StripedStashedLayout(Workload): | |
136 | def __init__(self, fs, m): | |
137 | super(StripedStashedLayout, self).__init__(fs, m) | |
138 | ||
139 | # Nice small stripes so we can quickly do our writes+validates | |
140 | self.sc = 4 | |
141 | self.ss = 65536 | |
142 | self.os = 262144 | |
143 | ||
144 | self.interesting_sizes = [ | |
145 | # Exactly stripe_count objects will exist | |
146 | self.os * self.sc, | |
147 | # Fewer than stripe_count objects will exist | |
148 | self.os * self.sc / 2, | |
149 | self.os * (self.sc - 1) + self.os / 2, | |
150 | self.os * (self.sc - 1) + self.os / 2 - 1, | |
151 | self.os * (self.sc + 1) + self.os / 2, | |
152 | self.os * (self.sc + 1) + self.os / 2 + 1, | |
153 | # More than stripe_count objects will exist | |
154 | self.os * self.sc + self.os * self.sc / 2 | |
155 | ] | |
156 | ||
157 | def write(self): | |
158 | # Create a dir with a striped layout set on it | |
159 | self._mount.run_shell(["mkdir", "stripey"]) | |
160 | ||
161 | self._mount.setfattr("./stripey", "ceph.dir.layout", | |
162 | "stripe_unit={ss} stripe_count={sc} object_size={os} pool={pool}".format( | |
163 | ss=self.ss, os=self.os, sc=self.sc, | |
164 | pool=self._filesystem.get_data_pool_name() | |
165 | )) | |
166 | ||
167 | # Write files, then flush metadata so that its layout gets written into an xattr | |
168 | for i, n_bytes in enumerate(self.interesting_sizes): | |
169 | self._mount.write_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes) | |
170 | # This is really just validating the validator | |
171 | self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes) | |
172 | self._filesystem.mds_asok(["flush", "journal"]) | |
173 | ||
174 | # Write another file in the same way, but this time don't flush the metadata, | |
175 | # so that it won't have the layout xattr | |
176 | self._mount.write_test_pattern("stripey/unflushed_file", 1024 * 512) | |
177 | self._mount.validate_test_pattern("stripey/unflushed_file", 1024 * 512) | |
178 | ||
179 | self._initial_state = { | |
180 | "unflushed_ino": self._mount.path_to_ino("stripey/unflushed_file") | |
181 | } | |
182 | ||
183 | def flush(self): | |
184 | # Pass because we already selectively flushed during write | |
185 | pass | |
186 | ||
187 | def validate(self): | |
188 | # The first files should have been recovered into its original location | |
189 | # with the correct layout: read back correct data | |
190 | for i, n_bytes in enumerate(self.interesting_sizes): | |
191 | try: | |
192 | self._mount.validate_test_pattern("stripey/flushed_file_{0}".format(i), n_bytes) | |
193 | except CommandFailedError as e: | |
194 | self._errors.append( | |
195 | ValidationError("File {0} (size {1}): {2}".format(i, n_bytes, e), traceback.format_exc(3)) | |
196 | ) | |
197 | ||
198 | # The unflushed file should have been recovered into lost+found without | |
199 | # the correct layout: read back junk | |
200 | ino_name = "%x" % self._initial_state["unflushed_ino"] | |
201 | self.assert_equal(self._mount.ls("lost+found"), [ino_name]) | |
202 | try: | |
203 | self._mount.validate_test_pattern(os.path.join("lost+found", ino_name), 1024 * 512) | |
204 | except CommandFailedError: | |
205 | pass | |
206 | else: | |
207 | self._errors.append( | |
208 | ValidationError("Unexpectedly valid data in unflushed striped file", "") | |
209 | ) | |
210 | ||
211 | return self._errors | |
212 | ||
213 | ||
214 | class ManyFilesWorkload(Workload): | |
215 | def __init__(self, filesystem, mount, file_count): | |
216 | super(ManyFilesWorkload, self).__init__(filesystem, mount) | |
217 | self.file_count = file_count | |
218 | ||
219 | def write(self): | |
220 | self._mount.run_shell(["mkdir", "subdir"]) | |
221 | for n in range(0, self.file_count): | |
222 | self._mount.write_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024) | |
223 | ||
224 | def validate(self): | |
225 | for n in range(0, self.file_count): | |
226 | try: | |
227 | self._mount.validate_test_pattern("subdir/{0}".format(n), 6 * 1024 * 1024) | |
228 | except CommandFailedError as e: | |
229 | self._errors.append( | |
230 | ValidationError("File {0}: {1}".format(n, e), traceback.format_exc(3)) | |
231 | ) | |
232 | ||
233 | return self._errors | |
234 | ||
235 | ||
236 | class MovedDir(Workload): | |
237 | def write(self): | |
238 | # Create a nested dir that we will then move. Two files with two different | |
239 | # backtraces referring to the moved dir, claiming two different locations for | |
240 | # it. We will see that only one backtrace wins and the dir ends up with | |
241 | # single linkage. | |
242 | self._mount.run_shell(["mkdir", "-p", "grandmother/parent"]) | |
243 | self._mount.write_n_mb("grandmother/parent/orig_pos_file", 1) | |
244 | self._filesystem.mds_asok(["flush", "journal"]) | |
245 | self._mount.run_shell(["mkdir", "grandfather"]) | |
246 | self._mount.run_shell(["mv", "grandmother/parent", "grandfather"]) | |
247 | self._mount.write_n_mb("grandfather/parent/new_pos_file", 2) | |
248 | self._filesystem.mds_asok(["flush", "journal"]) | |
249 | ||
250 | self._initial_state = ( | |
251 | self._mount.stat("grandfather/parent/orig_pos_file"), | |
252 | self._mount.stat("grandfather/parent/new_pos_file") | |
253 | ) | |
254 | ||
255 | def validate(self): | |
256 | root_files = self._mount.ls() | |
257 | self.assert_equal(len(root_files), 1) | |
258 | self.assert_equal(root_files[0] in ["grandfather", "grandmother"], True) | |
259 | winner = root_files[0] | |
260 | st_opf = self._mount.stat("{0}/parent/orig_pos_file".format(winner)) | |
261 | st_npf = self._mount.stat("{0}/parent/new_pos_file".format(winner)) | |
262 | ||
263 | self.assert_equal(st_opf['st_size'], self._initial_state[0]['st_size']) | |
264 | self.assert_equal(st_npf['st_size'], self._initial_state[1]['st_size']) | |
265 | ||
266 | ||
267 | class MissingZerothObject(Workload): | |
268 | def write(self): | |
269 | self._mount.run_shell(["mkdir", "subdir"]) | |
270 | self._mount.write_n_mb("subdir/sixmegs", 6) | |
271 | self._initial_state = self._mount.stat("subdir/sixmegs") | |
272 | ||
273 | def damage(self): | |
274 | super(MissingZerothObject, self).damage() | |
275 | zeroth_id = "{0:x}.00000000".format(self._initial_state['st_ino']) | |
276 | self._filesystem.rados(["rm", zeroth_id], pool=self._filesystem.get_data_pool_name()) | |
277 | ||
278 | def validate(self): | |
279 | st = self._mount.stat("lost+found/{0:x}".format(self._initial_state['st_ino'])) | |
280 | self.assert_equal(st['st_size'], self._initial_state['st_size']) | |
281 | ||
282 | ||
283 | class NonDefaultLayout(Workload): | |
284 | """ | |
285 | Check that the reconstruction copes with files that have a different | |
286 | object size in their layout | |
287 | """ | |
288 | def write(self): | |
289 | self._mount.run_shell(["touch", "datafile"]) | |
290 | self._mount.setfattr("./datafile", "ceph.file.layout.object_size", "8388608") | |
291 | self._mount.run_shell(["dd", "if=/dev/urandom", "of=./datafile", "bs=1M", "count=32"]) | |
292 | self._initial_state = self._mount.stat("datafile") | |
293 | ||
294 | def validate(self): | |
295 | # Check we got the layout reconstructed properly | |
296 | object_size = int(self._mount.getfattr( | |
297 | "./datafile", "ceph.file.layout.object_size")) | |
298 | self.assert_equal(object_size, 8388608) | |
299 | ||
300 | # Check we got the file size reconstructed properly | |
301 | st = self._mount.stat("datafile") | |
302 | self.assert_equal(st['st_size'], self._initial_state['st_size']) | |
303 | ||
304 | ||
305 | class TestDataScan(CephFSTestCase): | |
306 | MDSS_REQUIRED = 2 | |
307 | ||
308 | def is_marked_damaged(self, rank): | |
309 | mds_map = self.fs.get_mds_map() | |
310 | return rank in mds_map['damaged'] | |
311 | ||
181888fb | 312 | def _rebuild_metadata(self, workload, workers=1): |
7c673cae FG |
313 | """ |
314 | That when all objects in metadata pool are removed, we can rebuild a metadata pool | |
315 | based on the contents of a data pool, and a client can see and read our files. | |
316 | """ | |
317 | ||
318 | # First, inject some files | |
319 | ||
7c673cae FG |
320 | workload.write() |
321 | ||
322 | # Unmount the client and flush the journal: the tool should also cope with | |
323 | # situations where there is dirty metadata, but we'll test that separately | |
324 | self.mount_a.umount_wait() | |
325 | workload.flush() | |
326 | ||
7c673cae FG |
327 | # Stop the MDS |
328 | self.fs.mds_stop() | |
329 | self.fs.mds_fail() | |
330 | ||
331 | # After recovery, we need the MDS to not be strict about stats (in production these options | |
332 | # are off by default, but in QA we need to explicitly disable them) | |
333 | self.fs.set_ceph_conf('mds', 'mds verify scatter', False) | |
334 | self.fs.set_ceph_conf('mds', 'mds debug scatterstat', False) | |
335 | ||
336 | # Apply any data damage the workload wants | |
337 | workload.damage() | |
338 | ||
339 | # Reset the MDS map in case multiple ranks were in play: recovery procedure | |
340 | # only understands how to rebuild metadata under rank 0 | |
341 | self.fs.mon_manager.raw_cluster_cmd('fs', 'reset', self.fs.name, | |
342 | '--yes-i-really-mean-it') | |
343 | ||
181888fb | 344 | self.fs.mds_restart() |
7c673cae FG |
345 | |
346 | def get_state(mds_id): | |
347 | info = self.mds_cluster.get_mds_info(mds_id) | |
348 | return info['state'] if info is not None else None | |
349 | ||
181888fb FG |
350 | self.wait_until_true(lambda: self.is_marked_damaged(0), 60) |
351 | for mds_id in self.fs.mds_ids: | |
352 | self.wait_until_equal( | |
353 | lambda: get_state(mds_id), | |
354 | "up:standby", | |
355 | timeout=60) | |
7c673cae FG |
356 | |
357 | self.fs.table_tool([self.fs.name + ":0", "reset", "session"]) | |
358 | self.fs.table_tool([self.fs.name + ":0", "reset", "snap"]) | |
359 | self.fs.table_tool([self.fs.name + ":0", "reset", "inode"]) | |
360 | ||
361 | # Run the recovery procedure | |
362 | if False: | |
363 | with self.assertRaises(CommandFailedError): | |
364 | # Normal reset should fail when no objects are present, we'll use --force instead | |
365 | self.fs.journal_tool(["journal", "reset"]) | |
366 | ||
181888fb FG |
367 | self.fs.journal_tool(["journal", "reset", "--force"]) |
368 | self.fs.data_scan(["init"]) | |
369 | self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()], worker_count=workers) | |
370 | self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()], worker_count=workers) | |
7c673cae FG |
371 | |
372 | # Mark the MDS repaired | |
373 | self.fs.mon_manager.raw_cluster_cmd('mds', 'repaired', '0') | |
374 | ||
375 | # Start the MDS | |
376 | self.fs.mds_restart() | |
377 | self.fs.wait_for_daemons() | |
7c673cae FG |
378 | log.info(str(self.mds_cluster.status())) |
379 | ||
380 | # Mount a client | |
181888fb | 381 | self.mount_a.mount() |
7c673cae FG |
382 | self.mount_a.wait_until_mounted() |
383 | ||
384 | # See that the files are present and correct | |
385 | errors = workload.validate() | |
386 | if errors: | |
387 | log.error("Validation errors found: {0}".format(len(errors))) | |
388 | for e in errors: | |
389 | log.error(e.exception) | |
390 | log.error(e.backtrace) | |
391 | raise AssertionError("Validation failed, first error: {0}\n{1}".format( | |
392 | errors[0].exception, errors[0].backtrace | |
393 | )) | |
394 | ||
395 | def test_rebuild_simple(self): | |
396 | self._rebuild_metadata(SimpleWorkload(self.fs, self.mount_a)) | |
397 | ||
398 | def test_rebuild_moved_file(self): | |
399 | self._rebuild_metadata(MovedFile(self.fs, self.mount_a)) | |
400 | ||
401 | def test_rebuild_backtraceless(self): | |
402 | self._rebuild_metadata(BacktracelessFile(self.fs, self.mount_a)) | |
403 | ||
404 | def test_rebuild_moved_dir(self): | |
405 | self._rebuild_metadata(MovedDir(self.fs, self.mount_a)) | |
406 | ||
407 | def test_rebuild_missing_zeroth(self): | |
408 | self._rebuild_metadata(MissingZerothObject(self.fs, self.mount_a)) | |
409 | ||
410 | def test_rebuild_nondefault_layout(self): | |
411 | self._rebuild_metadata(NonDefaultLayout(self.fs, self.mount_a)) | |
412 | ||
413 | def test_stashed_layout(self): | |
414 | self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a)) | |
415 | ||
7c673cae | 416 | def _dirfrag_keys(self, object_id): |
7c673cae FG |
417 | keys_str = self.fs.rados(["listomapkeys", object_id]) |
418 | if keys_str: | |
419 | return keys_str.split("\n") | |
420 | else: | |
421 | return [] | |
422 | ||
423 | def test_fragmented_injection(self): | |
424 | """ | |
425 | That when injecting a dentry into a fragmented directory, we put it in the right fragment. | |
426 | """ | |
427 | ||
428 | self.fs.set_allow_dirfrags(True) | |
429 | ||
430 | file_count = 100 | |
431 | file_names = ["%s" % n for n in range(0, file_count)] | |
432 | ||
433 | # Create a directory of `file_count` files, each named after its | |
434 | # decimal number and containing the string of its decimal number | |
435 | self.mount_a.run_python(dedent(""" | |
436 | import os | |
437 | path = os.path.join("{path}", "subdir") | |
438 | os.mkdir(path) | |
439 | for n in range(0, {file_count}): | |
440 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
441 | """.format( | |
442 | path=self.mount_a.mountpoint, | |
443 | file_count=file_count | |
444 | ))) | |
445 | ||
446 | dir_ino = self.mount_a.path_to_ino("subdir") | |
447 | ||
448 | # Only one MDS should be active! | |
449 | self.assertEqual(len(self.fs.get_active_names()), 1) | |
450 | ||
451 | # Ensure that one directory is fragmented | |
452 | mds_id = self.fs.get_active_names()[0] | |
453 | self.fs.mds_asok(["dirfrag", "split", "/subdir", "0/0", "1"], mds_id) | |
454 | ||
455 | # Flush journal and stop MDS | |
456 | self.mount_a.umount_wait() | |
457 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
458 | self.fs.mds_stop() | |
459 | self.fs.mds_fail() | |
460 | ||
461 | # Pick a dentry and wipe out its key | |
462 | # Because I did a 1 bit split, I know one frag will be named <inode>.01000000 | |
463 | frag_obj_id = "{0:x}.01000000".format(dir_ino) | |
464 | keys = self._dirfrag_keys(frag_obj_id) | |
465 | victim_key = keys[7] # arbitrary choice | |
466 | log.info("victim_key={0}".format(victim_key)) | |
467 | victim_dentry = victim_key.split("_head")[0] | |
468 | self.fs.rados(["rmomapkey", frag_obj_id, victim_key]) | |
469 | ||
470 | # Start filesystem back up, observe that the file appears to be gone in an `ls` | |
471 | self.fs.mds_restart() | |
472 | self.fs.wait_for_daemons() | |
473 | self.mount_a.mount() | |
474 | self.mount_a.wait_until_mounted() | |
475 | files = self.mount_a.run_shell(["ls", "subdir/"]).stdout.getvalue().strip().split("\n") | |
476 | self.assertListEqual(sorted(files), sorted(list(set(file_names) - set([victim_dentry])))) | |
477 | ||
478 | # Stop the filesystem | |
479 | self.mount_a.umount_wait() | |
480 | self.fs.mds_stop() | |
481 | self.fs.mds_fail() | |
482 | ||
483 | # Run data-scan, observe that it inserts our dentry back into the correct fragment | |
484 | # by checking the omap now has the dentry's key again | |
485 | self.fs.data_scan(["scan_extents", self.fs.get_data_pool_name()]) | |
486 | self.fs.data_scan(["scan_inodes", self.fs.get_data_pool_name()]) | |
487 | self.assertIn(victim_key, self._dirfrag_keys(frag_obj_id)) | |
488 | ||
489 | # Start the filesystem and check that the dentry we deleted is now once again visible | |
490 | # and points to the correct file data. | |
491 | self.fs.mds_restart() | |
492 | self.fs.wait_for_daemons() | |
493 | self.mount_a.mount() | |
494 | self.mount_a.wait_until_mounted() | |
495 | out = self.mount_a.run_shell(["cat", "subdir/{0}".format(victim_dentry)]).stdout.getvalue().strip() | |
496 | self.assertEqual(out, victim_dentry) | |
497 | ||
498 | # Finally, close the loop by checking our injected dentry survives a merge | |
499 | mds_id = self.fs.get_active_names()[0] | |
500 | self.mount_a.ls("subdir") # Do an ls to ensure both frags are in cache so the merge will work | |
501 | self.fs.mds_asok(["dirfrag", "merge", "/subdir", "0/0"], mds_id) | |
502 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
503 | frag_obj_id = "{0:x}.00000000".format(dir_ino) | |
504 | keys = self._dirfrag_keys(frag_obj_id) | |
505 | self.assertListEqual(sorted(keys), sorted(["%s_head" % f for f in file_names])) | |
506 | ||
507 | @for_teuthology | |
508 | def test_parallel_execution(self): | |
509 | self._rebuild_metadata(ManyFilesWorkload(self.fs, self.mount_a, 25), workers=7) | |
510 | ||
511 | def test_pg_files(self): | |
512 | """ | |
513 | That the pg files command tells us which files are associated with | |
514 | a particular PG | |
515 | """ | |
516 | file_count = 20 | |
517 | self.mount_a.run_shell(["mkdir", "mydir"]) | |
518 | self.mount_a.create_n_files("mydir/myfile", file_count) | |
519 | ||
520 | # Some files elsewhere in the system that we will ignore | |
521 | # to check that the tool is filtering properly | |
522 | self.mount_a.run_shell(["mkdir", "otherdir"]) | |
523 | self.mount_a.create_n_files("otherdir/otherfile", file_count) | |
524 | ||
525 | pgs_to_files = defaultdict(list) | |
526 | # Rough (slow) reimplementation of the logic | |
527 | for i in range(0, file_count): | |
528 | file_path = "mydir/myfile_{0}".format(i) | |
529 | ino = self.mount_a.path_to_ino(file_path) | |
530 | obj = "{0:x}.{1:08x}".format(ino, 0) | |
531 | pgid = json.loads(self.fs.mon_manager.raw_cluster_cmd( | |
532 | "osd", "map", self.fs.get_data_pool_name(), obj, | |
533 | "--format=json-pretty" | |
534 | ))['pgid'] | |
535 | pgs_to_files[pgid].append(file_path) | |
536 | log.info("{0}: {1}".format(file_path, pgid)) | |
537 | ||
538 | pg_count = self.fs.get_pgs_per_fs_pool() | |
539 | for pg_n in range(0, pg_count): | |
540 | pg_str = "{0}.{1}".format(self.fs.get_data_pool_id(), pg_n) | |
541 | out = self.fs.data_scan(["pg_files", "mydir", pg_str]) | |
542 | lines = [l for l in out.split("\n") if l] | |
543 | log.info("{0}: {1}".format(pg_str, lines)) | |
544 | self.assertSetEqual(set(lines), set(pgs_to_files[pg_str])) | |
545 | ||
546 | def test_scan_links(self): | |
547 | """ | |
548 | The scan_links command fixes linkage errors | |
549 | """ | |
550 | self.mount_a.run_shell(["mkdir", "testdir1"]) | |
551 | self.mount_a.run_shell(["mkdir", "testdir2"]) | |
552 | dir1_ino = self.mount_a.path_to_ino("testdir1") | |
553 | dir2_ino = self.mount_a.path_to_ino("testdir2") | |
554 | dirfrag1_oid = "{0:x}.00000000".format(dir1_ino) | |
555 | dirfrag2_oid = "{0:x}.00000000".format(dir2_ino) | |
556 | ||
557 | self.mount_a.run_shell(["touch", "testdir1/file1"]) | |
558 | self.mount_a.run_shell(["ln", "testdir1/file1", "testdir1/link1"]) | |
559 | self.mount_a.run_shell(["ln", "testdir1/file1", "testdir2/link2"]) | |
560 | ||
561 | mds_id = self.fs.get_active_names()[0] | |
562 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
563 | ||
564 | dirfrag1_keys = self._dirfrag_keys(dirfrag1_oid) | |
565 | ||
566 | # introduce duplicated primary link | |
567 | file1_key = "file1_head" | |
568 | self.assertIn(file1_key, dirfrag1_keys) | |
569 | file1_omap_data = self.fs.rados(["getomapval", dirfrag1_oid, file1_key, '-']) | |
570 | self.fs.rados(["setomapval", dirfrag2_oid, file1_key], stdin_data=file1_omap_data) | |
571 | self.assertIn(file1_key, self._dirfrag_keys(dirfrag2_oid)) | |
572 | ||
573 | # remove a remote link, make inode link count incorrect | |
574 | link1_key = 'link1_head' | |
575 | self.assertIn(link1_key, dirfrag1_keys) | |
576 | self.fs.rados(["rmomapkey", dirfrag1_oid, link1_key]) | |
577 | ||
578 | # increase good primary link's version | |
579 | self.mount_a.run_shell(["touch", "testdir1/file1"]) | |
580 | self.mount_a.umount_wait() | |
581 | ||
582 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
583 | self.fs.mds_stop() | |
584 | self.fs.mds_fail() | |
585 | ||
586 | # repair linkage errors | |
587 | self.fs.data_scan(["scan_links"]) | |
588 | ||
589 | # primary link in testdir2 was deleted? | |
590 | self.assertNotIn(file1_key, self._dirfrag_keys(dirfrag2_oid)) | |
591 | ||
592 | self.fs.mds_restart() | |
593 | self.fs.wait_for_daemons() | |
594 | ||
595 | self.mount_a.mount() | |
596 | self.mount_a.wait_until_mounted() | |
597 | ||
598 | # link count was adjusted? | |
599 | file1_nlink = self.mount_a.path_to_nlink("testdir1/file1") | |
600 | self.assertEqual(file1_nlink, 2) |