]>
Commit | Line | Data |
---|---|---|
f67539c2 | 1 | from io import StringIO |
7c673cae FG |
2 | |
3 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
4 | from teuthology.orchestra import run | |
5 | ||
f67539c2 TL |
6 | import os |
7 | import time | |
7c673cae FG |
8 | import logging |
9 | log = logging.getLogger(__name__) | |
10 | ||
11 | ||
12 | class TestFragmentation(CephFSTestCase): | |
13 | CLIENTS_REQUIRED = 1 | |
14 | MDSS_REQUIRED = 1 | |
15 | ||
16 | def get_splits(self): | |
17 | return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_split'] | |
18 | ||
19 | def get_merges(self): | |
20 | return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_merge'] | |
21 | ||
22 | def get_dir_ino(self, path): | |
23 | dir_cache = self.fs.read_cache(path, 0) | |
24 | dir_ino = None | |
25 | dir_inono = self.mount_a.path_to_ino(path.strip("/")) | |
26 | for ino in dir_cache: | |
27 | if ino['ino'] == dir_inono: | |
28 | dir_ino = ino | |
29 | break | |
30 | self.assertIsNotNone(dir_ino) | |
31 | return dir_ino | |
32 | ||
33 | def _configure(self, **kwargs): | |
34 | """ | |
35 | Apply kwargs as MDS configuration settings, enable dirfrags | |
36 | and restart the MDSs. | |
37 | """ | |
7c673cae FG |
38 | |
39 | for k, v in kwargs.items(): | |
40 | self.ceph_cluster.set_ceph_conf("mds", k, v.__str__()) | |
41 | ||
7c673cae FG |
42 | self.mds_cluster.mds_fail_restart() |
43 | self.fs.wait_for_daemons() | |
44 | ||
45 | def test_oversize(self): | |
46 | """ | |
47 | That a directory is split when it becomes too large. | |
48 | """ | |
49 | ||
50 | split_size = 20 | |
51 | merge_size = 5 | |
52 | ||
53 | self._configure( | |
54 | mds_bal_split_size=split_size, | |
55 | mds_bal_merge_size=merge_size, | |
56 | mds_bal_split_bits=1 | |
57 | ) | |
58 | ||
59 | self.assertEqual(self.get_splits(), 0) | |
60 | ||
61 | self.mount_a.create_n_files("splitdir/file", split_size + 1) | |
62 | ||
63 | self.wait_until_true( | |
64 | lambda: self.get_splits() == 1, | |
65 | timeout=30 | |
66 | ) | |
67 | ||
68 | frags = self.get_dir_ino("/splitdir")['dirfrags'] | |
69 | self.assertEqual(len(frags), 2) | |
d2e6a577 FG |
70 | self.assertEqual(frags[0]['dirfrag'], "0x10000000000.0*") |
71 | self.assertEqual(frags[1]['dirfrag'], "0x10000000000.1*") | |
7c673cae FG |
72 | self.assertEqual( |
73 | sum([len(f['dentries']) for f in frags]), | |
74 | split_size + 1 | |
75 | ) | |
76 | ||
77 | self.assertEqual(self.get_merges(), 0) | |
78 | ||
79 | self.mount_a.run_shell(["rm", "-f", run.Raw("splitdir/file*")]) | |
80 | ||
81 | self.wait_until_true( | |
82 | lambda: self.get_merges() == 1, | |
83 | timeout=30 | |
84 | ) | |
85 | ||
86 | self.assertEqual(len(self.get_dir_ino("/splitdir")["dirfrags"]), 1) | |
87 | ||
88 | def test_rapid_creation(self): | |
89 | """ | |
90 | That the fast-splitting limit of 1.5x normal limit is | |
91 | applied when creating dentries quickly. | |
92 | """ | |
93 | ||
94 | split_size = 100 | |
95 | merge_size = 1 | |
96 | ||
97 | self._configure( | |
98 | mds_bal_split_size=split_size, | |
99 | mds_bal_merge_size=merge_size, | |
100 | mds_bal_split_bits=3, | |
31f18b77 | 101 | mds_bal_fragment_size_max=int(split_size * 1.5 + 2) |
7c673cae FG |
102 | ) |
103 | ||
104 | # We test this only at a single split level. If a client was sending | |
105 | # IO so fast that it hit a second split before the first split | |
106 | # was complete, it could violate mds_bal_fragment_size_max -- there | |
107 | # is a window where the child dirfrags of a split are unfrozen | |
108 | # (so they can grow), but still have STATE_FRAGMENTING (so they | |
109 | # can't be split). | |
110 | ||
111 | # By writing 4x the split size when the split bits are set | |
112 | # to 3 (i.e. 4-ways), I am reasonably sure to see precisely | |
113 | # one split. The test is to check whether that split | |
114 | # happens soon enough that the client doesn't exceed | |
115 | # 2x the split_size (the "immediate" split mode should | |
116 | # kick in at 1.5x the split size). | |
117 | ||
118 | self.assertEqual(self.get_splits(), 0) | |
119 | self.mount_a.create_n_files("splitdir/file", split_size * 4) | |
120 | self.wait_until_equal( | |
121 | self.get_splits, | |
122 | 1, | |
123 | reject_fn=lambda s: s > 1, | |
124 | timeout=30 | |
125 | ) | |
126 | ||
127 | def test_deep_split(self): | |
128 | """ | |
129 | That when the directory grows many times larger than split size, | |
130 | the fragments get split again. | |
131 | """ | |
132 | ||
133 | split_size = 100 | |
134 | merge_size = 1 # i.e. don't merge frag unless its empty | |
135 | split_bits = 1 | |
136 | ||
137 | branch_factor = 2**split_bits | |
138 | ||
139 | # Arbitrary: how many levels shall we try fragmenting before | |
140 | # ending the test? | |
141 | max_depth = 5 | |
142 | ||
143 | self._configure( | |
144 | mds_bal_split_size=split_size, | |
145 | mds_bal_merge_size=merge_size, | |
146 | mds_bal_split_bits=split_bits | |
147 | ) | |
148 | ||
149 | # Each iteration we will create another level of fragments. The | |
150 | # placement of dentries into fragments is by hashes (i.e. pseudo | |
151 | # random), so we rely on statistics to get the behaviour that | |
152 | # by writing about 1.5x as many dentries as the split_size times | |
153 | # the number of frags, we will get them all to exceed their | |
154 | # split size and trigger a split. | |
155 | depth = 0 | |
156 | files_written = 0 | |
157 | splits_expected = 0 | |
158 | while depth < max_depth: | |
159 | log.info("Writing files for depth {0}".format(depth)) | |
160 | target_files = branch_factor**depth * int(split_size * 1.5) | |
161 | create_files = target_files - files_written | |
162 | ||
163 | self.ceph_cluster.mon_manager.raw_cluster_cmd("log", | |
164 | "{0} Writing {1} files (depth={2})".format( | |
165 | self.__class__.__name__, create_files, depth | |
166 | )) | |
167 | self.mount_a.create_n_files("splitdir/file_{0}".format(depth), | |
168 | create_files) | |
169 | self.ceph_cluster.mon_manager.raw_cluster_cmd("log", | |
170 | "{0} Done".format(self.__class__.__name__)) | |
171 | ||
172 | files_written += create_files | |
173 | log.info("Now have {0} files".format(files_written)) | |
174 | ||
175 | splits_expected += branch_factor**depth | |
176 | log.info("Waiting to see {0} splits".format(splits_expected)) | |
177 | try: | |
178 | self.wait_until_equal( | |
179 | self.get_splits, | |
180 | splits_expected, | |
181 | timeout=30, | |
182 | reject_fn=lambda x: x > splits_expected | |
183 | ) | |
184 | ||
185 | frags = self.get_dir_ino("/splitdir")['dirfrags'] | |
186 | self.assertEqual(len(frags), branch_factor**(depth+1)) | |
187 | self.assertEqual( | |
188 | sum([len(f['dentries']) for f in frags]), | |
189 | target_files | |
190 | ) | |
191 | except: | |
192 | # On failures, log what fragmentation we actually ended | |
193 | # up with. This block is just for logging, at the end | |
194 | # we raise the exception again. | |
195 | frags = self.get_dir_ino("/splitdir")['dirfrags'] | |
196 | log.info("depth={0} splits_expected={1} files_written={2}".format( | |
197 | depth, splits_expected, files_written | |
198 | )) | |
199 | log.info("Dirfrags:") | |
200 | for f in frags: | |
201 | log.info("{0}: {1}".format( | |
202 | f['dirfrag'], len(f['dentries']) | |
203 | )) | |
204 | raise | |
205 | ||
206 | depth += 1 | |
207 | ||
208 | # Remember the inode number because we will be checking for | |
209 | # objects later. | |
210 | dir_inode_no = self.mount_a.path_to_ino("splitdir") | |
211 | ||
212 | self.mount_a.run_shell(["rm", "-rf", "splitdir/"]) | |
213 | self.mount_a.umount_wait() | |
214 | ||
215 | self.fs.mds_asok(['flush', 'journal']) | |
216 | ||
20effc67 TL |
217 | def _check_pq_finished(): |
218 | num_strays = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']['num_strays'] | |
219 | pq_ops = self.fs.mds_asok(['perf', 'dump', 'purge_queue'])['purge_queue']['pq_executing'] | |
220 | return num_strays == 0 and pq_ops == 0 | |
221 | ||
7c673cae | 222 | # Wait for all strays to purge |
20effc67 TL |
223 | self.wait_until_true( |
224 | lambda: _check_pq_finished(), | |
7c673cae FG |
225 | timeout=1200 |
226 | ) | |
227 | # Check that the metadata pool objects for all the myriad | |
228 | # child fragments are gone | |
f67539c2 | 229 | metadata_objs = self.fs.radosmo(["ls"], stdout=StringIO()).strip() |
7c673cae | 230 | frag_objs = [] |
f67539c2 | 231 | for o in metadata_objs.split("\n"): |
7c673cae FG |
232 | if o.startswith("{0:x}.".format(dir_inode_no)): |
233 | frag_objs.append(o) | |
234 | self.assertListEqual(frag_objs, []) | |
f67539c2 TL |
235 | |
236 | def test_split_straydir(self): | |
237 | """ | |
238 | That stray dir is split when it becomes too large. | |
239 | """ | |
240 | def _count_fragmented(): | |
241 | mdsdir_cache = self.fs.read_cache("~mdsdir", 1) | |
242 | num = 0 | |
243 | for ino in mdsdir_cache: | |
244 | if ino["ino"] == 0x100: | |
245 | continue | |
246 | if len(ino["dirfrags"]) > 1: | |
247 | log.info("straydir 0x{:X} is fragmented".format(ino["ino"])) | |
248 | num += 1; | |
249 | return num | |
250 | ||
251 | split_size = 50 | |
252 | merge_size = 5 | |
253 | split_bits = 1 | |
254 | ||
255 | self._configure( | |
256 | mds_bal_split_size=split_size, | |
257 | mds_bal_merge_size=merge_size, | |
258 | mds_bal_split_bits=split_bits, | |
259 | mds_bal_fragment_size_max=(split_size * 100) | |
260 | ) | |
261 | ||
262 | # manually split/merge | |
263 | self.assertEqual(_count_fragmented(), 0) | |
264 | self.fs.mds_asok(["dirfrag", "split", "~mdsdir/stray8", "0/0", "1"]) | |
265 | self.fs.mds_asok(["dirfrag", "split", "~mdsdir/stray9", "0/0", "1"]) | |
266 | self.wait_until_true( | |
267 | lambda: _count_fragmented() == 2, | |
268 | timeout=30 | |
269 | ) | |
270 | ||
271 | time.sleep(30) | |
272 | ||
273 | self.fs.mds_asok(["dirfrag", "merge", "~mdsdir/stray8", "0/0"]) | |
274 | self.wait_until_true( | |
275 | lambda: _count_fragmented() == 1, | |
276 | timeout=30 | |
277 | ) | |
278 | ||
279 | time.sleep(30) | |
280 | ||
281 | # auto merge | |
282 | ||
283 | # merging stray dirs is driven by MDCache::advance_stray() | |
284 | # advance stray dir 10 times | |
285 | for _ in range(10): | |
286 | self.fs.mds_asok(['flush', 'journal']) | |
287 | ||
288 | self.wait_until_true( | |
289 | lambda: _count_fragmented() == 0, | |
290 | timeout=30 | |
291 | ) | |
292 | ||
293 | # auto split | |
294 | ||
295 | # there are 10 stray dirs. advance stray dir 20 times | |
296 | self.mount_a.create_n_files("testdir1/file", split_size * 20) | |
297 | self.mount_a.run_shell(["mkdir", "testdir2"]) | |
298 | testdir1_path = os.path.join(self.mount_a.mountpoint, "testdir1") | |
299 | for i in self.mount_a.ls(testdir1_path): | |
300 | self.mount_a.run_shell(["ln", "testdir1/{0}".format(i), "testdir2/"]) | |
301 | ||
302 | self.mount_a.umount_wait() | |
522d829b | 303 | self.mount_a.mount_wait() |
f67539c2 TL |
304 | self.mount_a.wait_until_mounted() |
305 | ||
306 | # flush journal and restart mds. after restart, testdir2 is not in mds' cache | |
307 | self.fs.mds_asok(['flush', 'journal']) | |
308 | self.mds_cluster.mds_fail_restart() | |
309 | self.fs.wait_for_daemons() | |
310 | # splitting stray dirs is driven by MDCache::advance_stray() | |
311 | # advance stray dir after unlink 'split_size' files. | |
312 | self.fs.mds_asok(['config', 'set', 'mds_log_events_per_segment', str(split_size)]) | |
313 | ||
314 | self.assertEqual(_count_fragmented(), 0) | |
315 | self.mount_a.run_shell(["rm", "-rf", "testdir1"]) | |
316 | self.wait_until_true( | |
317 | lambda: _count_fragmented() > 0, | |
318 | timeout=30 | |
319 | ) | |
39ae355f TL |
320 | |
321 | def test_dir_merge_with_snap_items(self): | |
322 | """ | |
323 | That directory remain fragmented when snapshot items are taken into account. | |
324 | """ | |
325 | split_size = 1000 | |
326 | merge_size = 100 | |
327 | self._configure( | |
328 | mds_bal_split_size=split_size, | |
329 | mds_bal_merge_size=merge_size, | |
330 | mds_bal_split_bits=1 | |
331 | ) | |
332 | ||
333 | # split the dir | |
334 | create_files = split_size + 50 | |
335 | self.mount_a.create_n_files("splitdir/file_", create_files) | |
336 | ||
337 | self.wait_until_true( | |
338 | lambda: self.get_splits() == 1, | |
339 | timeout=30 | |
340 | ) | |
341 | ||
342 | frags = self.get_dir_ino("/splitdir")['dirfrags'] | |
343 | self.assertEqual(len(frags), 2) | |
344 | self.assertEqual(frags[0]['dirfrag'], "0x10000000000.0*") | |
345 | self.assertEqual(frags[1]['dirfrag'], "0x10000000000.1*") | |
346 | self.assertEqual( | |
347 | sum([len(f['dentries']) for f in frags]), create_files | |
348 | ) | |
349 | ||
350 | self.assertEqual(self.get_merges(), 0) | |
351 | ||
352 | self.mount_a.run_shell(["mkdir", "splitdir/.snap/snap_a"]) | |
353 | self.mount_a.run_shell(["mkdir", "splitdir/.snap/snap_b"]) | |
354 | self.mount_a.run_shell(["rm", "-f", run.Raw("splitdir/file*")]) | |
355 | ||
356 | time.sleep(30) | |
357 | ||
358 | self.assertEqual(self.get_merges(), 0) | |
359 | self.assertEqual(len(self.get_dir_ino("/splitdir")["dirfrags"]), 2) |