]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_fragment.py
1102f887b5f0a6cd28ac2f20cc5ba6a22a0ae6d5
[ceph.git] / ceph / qa / tasks / cephfs / test_fragment.py
1 from io import StringIO
2
3 from tasks.cephfs.cephfs_test_case import CephFSTestCase
4 from teuthology.orchestra import run
5
6 import os
7 import time
8 import logging
9 log = logging.getLogger(__name__)
10
11
12 class TestFragmentation(CephFSTestCase):
13 CLIENTS_REQUIRED = 1
14 MDSS_REQUIRED = 1
15
16 def get_splits(self):
17 return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_split']
18
19 def get_merges(self):
20 return self.fs.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_merge']
21
22 def get_dir_ino(self, path):
23 dir_cache = self.fs.read_cache(path, 0)
24 dir_ino = None
25 dir_inono = self.mount_a.path_to_ino(path.strip("/"))
26 for ino in dir_cache:
27 if ino['ino'] == dir_inono:
28 dir_ino = ino
29 break
30 self.assertIsNotNone(dir_ino)
31 return dir_ino
32
33 def _configure(self, **kwargs):
34 """
35 Apply kwargs as MDS configuration settings, enable dirfrags
36 and restart the MDSs.
37 """
38
39 for k, v in kwargs.items():
40 self.ceph_cluster.set_ceph_conf("mds", k, v.__str__())
41
42 self.mds_cluster.mds_fail_restart()
43 self.fs.wait_for_daemons()
44
45 def test_oversize(self):
46 """
47 That a directory is split when it becomes too large.
48 """
49
50 split_size = 20
51 merge_size = 5
52
53 self._configure(
54 mds_bal_split_size=split_size,
55 mds_bal_merge_size=merge_size,
56 mds_bal_split_bits=1
57 )
58
59 self.assertEqual(self.get_splits(), 0)
60
61 self.mount_a.create_n_files("splitdir/file", split_size + 1)
62
63 self.wait_until_true(
64 lambda: self.get_splits() == 1,
65 timeout=30
66 )
67
68 frags = self.get_dir_ino("/splitdir")['dirfrags']
69 self.assertEqual(len(frags), 2)
70 self.assertEqual(frags[0]['dirfrag'], "0x10000000000.0*")
71 self.assertEqual(frags[1]['dirfrag'], "0x10000000000.1*")
72 self.assertEqual(
73 sum([len(f['dentries']) for f in frags]),
74 split_size + 1
75 )
76
77 self.assertEqual(self.get_merges(), 0)
78
79 self.mount_a.run_shell(["rm", "-f", run.Raw("splitdir/file*")])
80
81 self.wait_until_true(
82 lambda: self.get_merges() == 1,
83 timeout=30
84 )
85
86 self.assertEqual(len(self.get_dir_ino("/splitdir")["dirfrags"]), 1)
87
88 def test_rapid_creation(self):
89 """
90 That the fast-splitting limit of 1.5x normal limit is
91 applied when creating dentries quickly.
92 """
93
94 split_size = 100
95 merge_size = 1
96
97 self._configure(
98 mds_bal_split_size=split_size,
99 mds_bal_merge_size=merge_size,
100 mds_bal_split_bits=3,
101 mds_bal_fragment_size_max=int(split_size * 1.5 + 2)
102 )
103
104 # We test this only at a single split level. If a client was sending
105 # IO so fast that it hit a second split before the first split
106 # was complete, it could violate mds_bal_fragment_size_max -- there
107 # is a window where the child dirfrags of a split are unfrozen
108 # (so they can grow), but still have STATE_FRAGMENTING (so they
109 # can't be split).
110
111 # By writing 4x the split size when the split bits are set
112 # to 3 (i.e. 4-ways), I am reasonably sure to see precisely
113 # one split. The test is to check whether that split
114 # happens soon enough that the client doesn't exceed
115 # 2x the split_size (the "immediate" split mode should
116 # kick in at 1.5x the split size).
117
118 self.assertEqual(self.get_splits(), 0)
119 self.mount_a.create_n_files("splitdir/file", split_size * 4)
120 self.wait_until_equal(
121 self.get_splits,
122 1,
123 reject_fn=lambda s: s > 1,
124 timeout=30
125 )
126
127 def test_deep_split(self):
128 """
129 That when the directory grows many times larger than split size,
130 the fragments get split again.
131 """
132
133 split_size = 100
134 merge_size = 1 # i.e. don't merge frag unless its empty
135 split_bits = 1
136
137 branch_factor = 2**split_bits
138
139 # Arbitrary: how many levels shall we try fragmenting before
140 # ending the test?
141 max_depth = 5
142
143 self._configure(
144 mds_bal_split_size=split_size,
145 mds_bal_merge_size=merge_size,
146 mds_bal_split_bits=split_bits
147 )
148
149 # Each iteration we will create another level of fragments. The
150 # placement of dentries into fragments is by hashes (i.e. pseudo
151 # random), so we rely on statistics to get the behaviour that
152 # by writing about 1.5x as many dentries as the split_size times
153 # the number of frags, we will get them all to exceed their
154 # split size and trigger a split.
155 depth = 0
156 files_written = 0
157 splits_expected = 0
158 while depth < max_depth:
159 log.info("Writing files for depth {0}".format(depth))
160 target_files = branch_factor**depth * int(split_size * 1.5)
161 create_files = target_files - files_written
162
163 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
164 "{0} Writing {1} files (depth={2})".format(
165 self.__class__.__name__, create_files, depth
166 ))
167 self.mount_a.create_n_files("splitdir/file_{0}".format(depth),
168 create_files)
169 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
170 "{0} Done".format(self.__class__.__name__))
171
172 files_written += create_files
173 log.info("Now have {0} files".format(files_written))
174
175 splits_expected += branch_factor**depth
176 log.info("Waiting to see {0} splits".format(splits_expected))
177 try:
178 self.wait_until_equal(
179 self.get_splits,
180 splits_expected,
181 timeout=30,
182 reject_fn=lambda x: x > splits_expected
183 )
184
185 frags = self.get_dir_ino("/splitdir")['dirfrags']
186 self.assertEqual(len(frags), branch_factor**(depth+1))
187 self.assertEqual(
188 sum([len(f['dentries']) for f in frags]),
189 target_files
190 )
191 except:
192 # On failures, log what fragmentation we actually ended
193 # up with. This block is just for logging, at the end
194 # we raise the exception again.
195 frags = self.get_dir_ino("/splitdir")['dirfrags']
196 log.info("depth={0} splits_expected={1} files_written={2}".format(
197 depth, splits_expected, files_written
198 ))
199 log.info("Dirfrags:")
200 for f in frags:
201 log.info("{0}: {1}".format(
202 f['dirfrag'], len(f['dentries'])
203 ))
204 raise
205
206 depth += 1
207
208 # Remember the inode number because we will be checking for
209 # objects later.
210 dir_inode_no = self.mount_a.path_to_ino("splitdir")
211
212 self.mount_a.run_shell(["rm", "-rf", "splitdir/"])
213 self.mount_a.umount_wait()
214
215 self.fs.mds_asok(['flush', 'journal'])
216
217 def _check_pq_finished():
218 num_strays = self.fs.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']['num_strays']
219 pq_ops = self.fs.mds_asok(['perf', 'dump', 'purge_queue'])['purge_queue']['pq_executing']
220 return num_strays == 0 and pq_ops == 0
221
222 # Wait for all strays to purge
223 self.wait_until_true(
224 lambda: _check_pq_finished(),
225 timeout=1200
226 )
227 # Check that the metadata pool objects for all the myriad
228 # child fragments are gone
229 metadata_objs = self.fs.radosmo(["ls"], stdout=StringIO()).strip()
230 frag_objs = []
231 for o in metadata_objs.split("\n"):
232 if o.startswith("{0:x}.".format(dir_inode_no)):
233 frag_objs.append(o)
234 self.assertListEqual(frag_objs, [])
235
236 def test_split_straydir(self):
237 """
238 That stray dir is split when it becomes too large.
239 """
240 def _count_fragmented():
241 mdsdir_cache = self.fs.read_cache("~mdsdir", 1)
242 num = 0
243 for ino in mdsdir_cache:
244 if ino["ino"] == 0x100:
245 continue
246 if len(ino["dirfrags"]) > 1:
247 log.info("straydir 0x{:X} is fragmented".format(ino["ino"]))
248 num += 1;
249 return num
250
251 split_size = 50
252 merge_size = 5
253 split_bits = 1
254
255 self._configure(
256 mds_bal_split_size=split_size,
257 mds_bal_merge_size=merge_size,
258 mds_bal_split_bits=split_bits,
259 mds_bal_fragment_size_max=(split_size * 100)
260 )
261
262 # manually split/merge
263 self.assertEqual(_count_fragmented(), 0)
264 self.fs.mds_asok(["dirfrag", "split", "~mdsdir/stray8", "0/0", "1"])
265 self.fs.mds_asok(["dirfrag", "split", "~mdsdir/stray9", "0/0", "1"])
266 self.wait_until_true(
267 lambda: _count_fragmented() == 2,
268 timeout=30
269 )
270
271 time.sleep(30)
272
273 self.fs.mds_asok(["dirfrag", "merge", "~mdsdir/stray8", "0/0"])
274 self.wait_until_true(
275 lambda: _count_fragmented() == 1,
276 timeout=30
277 )
278
279 time.sleep(30)
280
281 # auto merge
282
283 # merging stray dirs is driven by MDCache::advance_stray()
284 # advance stray dir 10 times
285 for _ in range(10):
286 self.fs.mds_asok(['flush', 'journal'])
287
288 self.wait_until_true(
289 lambda: _count_fragmented() == 0,
290 timeout=30
291 )
292
293 # auto split
294
295 # there are 10 stray dirs. advance stray dir 20 times
296 self.mount_a.create_n_files("testdir1/file", split_size * 20)
297 self.mount_a.run_shell(["mkdir", "testdir2"])
298 testdir1_path = os.path.join(self.mount_a.mountpoint, "testdir1")
299 for i in self.mount_a.ls(testdir1_path):
300 self.mount_a.run_shell(["ln", "testdir1/{0}".format(i), "testdir2/"])
301
302 self.mount_a.umount_wait()
303 self.mount_a.mount_wait()
304 self.mount_a.wait_until_mounted()
305
306 # flush journal and restart mds. after restart, testdir2 is not in mds' cache
307 self.fs.mds_asok(['flush', 'journal'])
308 self.mds_cluster.mds_fail_restart()
309 self.fs.wait_for_daemons()
310 # splitting stray dirs is driven by MDCache::advance_stray()
311 # advance stray dir after unlink 'split_size' files.
312 self.fs.mds_asok(['config', 'set', 'mds_log_events_per_segment', str(split_size)])
313
314 self.assertEqual(_count_fragmented(), 0)
315 self.mount_a.run_shell(["rm", "-rf", "testdir1"])
316 self.wait_until_true(
317 lambda: _count_fragmented() > 0,
318 timeout=30
319 )