1 from io
import StringIO
3 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
4 from teuthology
.orchestra
import run
9 log
= logging
.getLogger(__name__
)
12 class TestFragmentation(CephFSTestCase
):
17 return self
.fs
.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_split']
20 return self
.fs
.mds_asok(['perf', 'dump', 'mds'])['mds']['dir_merge']
22 def get_dir_ino(self
, path
):
23 dir_cache
= self
.fs
.read_cache(path
, 0)
25 dir_inono
= self
.mount_a
.path_to_ino(path
.strip("/"))
27 if ino
['ino'] == dir_inono
:
30 self
.assertIsNotNone(dir_ino
)
33 def _configure(self
, **kwargs
):
35 Apply kwargs as MDS configuration settings, enable dirfrags
39 for k
, v
in kwargs
.items():
40 self
.ceph_cluster
.set_ceph_conf("mds", k
, v
.__str
__())
42 self
.mds_cluster
.mds_fail_restart()
43 self
.fs
.wait_for_daemons()
45 def test_oversize(self
):
47 That a directory is split when it becomes too large.
54 mds_bal_split_size
=split_size
,
55 mds_bal_merge_size
=merge_size
,
59 self
.assertEqual(self
.get_splits(), 0)
61 self
.mount_a
.create_n_files("splitdir/file", split_size
+ 1)
64 lambda: self
.get_splits() == 1,
68 frags
= self
.get_dir_ino("/splitdir")['dirfrags']
69 self
.assertEqual(len(frags
), 2)
70 self
.assertEqual(frags
[0]['dirfrag'], "0x10000000000.0*")
71 self
.assertEqual(frags
[1]['dirfrag'], "0x10000000000.1*")
73 sum([len(f
['dentries']) for f
in frags
]),
77 self
.assertEqual(self
.get_merges(), 0)
79 self
.mount_a
.run_shell(["rm", "-f", run
.Raw("splitdir/file*")])
82 lambda: self
.get_merges() == 1,
86 self
.assertEqual(len(self
.get_dir_ino("/splitdir")["dirfrags"]), 1)
88 def test_rapid_creation(self
):
90 That the fast-splitting limit of 1.5x normal limit is
91 applied when creating dentries quickly.
98 mds_bal_split_size
=split_size
,
99 mds_bal_merge_size
=merge_size
,
100 mds_bal_split_bits
=3,
101 mds_bal_fragment_size_max
=int(split_size
* 1.5 + 2)
104 # We test this only at a single split level. If a client was sending
105 # IO so fast that it hit a second split before the first split
106 # was complete, it could violate mds_bal_fragment_size_max -- there
107 # is a window where the child dirfrags of a split are unfrozen
108 # (so they can grow), but still have STATE_FRAGMENTING (so they
111 # By writing 4x the split size when the split bits are set
112 # to 3 (i.e. 4-ways), I am reasonably sure to see precisely
113 # one split. The test is to check whether that split
114 # happens soon enough that the client doesn't exceed
115 # 2x the split_size (the "immediate" split mode should
116 # kick in at 1.5x the split size).
118 self
.assertEqual(self
.get_splits(), 0)
119 self
.mount_a
.create_n_files("splitdir/file", split_size
* 4)
120 self
.wait_until_equal(
123 reject_fn
=lambda s
: s
> 1,
127 def test_deep_split(self
):
129 That when the directory grows many times larger than split size,
130 the fragments get split again.
134 merge_size
= 1 # i.e. don't merge frag unless its empty
137 branch_factor
= 2**split_bits
139 # Arbitrary: how many levels shall we try fragmenting before
144 mds_bal_split_size
=split_size
,
145 mds_bal_merge_size
=merge_size
,
146 mds_bal_split_bits
=split_bits
149 # Each iteration we will create another level of fragments. The
150 # placement of dentries into fragments is by hashes (i.e. pseudo
151 # random), so we rely on statistics to get the behaviour that
152 # by writing about 1.5x as many dentries as the split_size times
153 # the number of frags, we will get them all to exceed their
154 # split size and trigger a split.
158 while depth
< max_depth
:
159 log
.info("Writing files for depth {0}".format(depth
))
160 target_files
= branch_factor
**depth
* int(split_size
* 1.5)
161 create_files
= target_files
- files_written
163 self
.run_ceph_cmd("log",
164 "{0} Writing {1} files (depth={2})".format(
165 self
.__class
__.__name
__, create_files
, depth
167 self
.mount_a
.create_n_files("splitdir/file_{0}".format(depth
),
169 self
.run_ceph_cmd("log","{0} Done".format(self
.__class
__.__name
__))
171 files_written
+= create_files
172 log
.info("Now have {0} files".format(files_written
))
174 splits_expected
+= branch_factor
**depth
175 log
.info("Waiting to see {0} splits".format(splits_expected
))
177 self
.wait_until_equal(
181 reject_fn
=lambda x
: x
> splits_expected
184 frags
= self
.get_dir_ino("/splitdir")['dirfrags']
185 self
.assertEqual(len(frags
), branch_factor
**(depth
+1))
187 sum([len(f
['dentries']) for f
in frags
]),
191 # On failures, log what fragmentation we actually ended
192 # up with. This block is just for logging, at the end
193 # we raise the exception again.
194 frags
= self
.get_dir_ino("/splitdir")['dirfrags']
195 log
.info("depth={0} splits_expected={1} files_written={2}".format(
196 depth
, splits_expected
, files_written
198 log
.info("Dirfrags:")
200 log
.info("{0}: {1}".format(
201 f
['dirfrag'], len(f
['dentries'])
207 # Remember the inode number because we will be checking for
209 dir_inode_no
= self
.mount_a
.path_to_ino("splitdir")
211 self
.mount_a
.run_shell(["rm", "-rf", "splitdir/"])
212 self
.mount_a
.umount_wait()
214 self
.fs
.mds_asok(['flush', 'journal'])
216 def _check_pq_finished():
217 num_strays
= self
.fs
.mds_asok(['perf', 'dump', 'mds_cache'])['mds_cache']['num_strays']
218 pq_ops
= self
.fs
.mds_asok(['perf', 'dump', 'purge_queue'])['purge_queue']['pq_executing']
219 return num_strays
== 0 and pq_ops
== 0
221 # Wait for all strays to purge
222 self
.wait_until_true(
223 lambda: _check_pq_finished(),
226 # Check that the metadata pool objects for all the myriad
227 # child fragments are gone
228 metadata_objs
= self
.fs
.radosmo(["ls"], stdout
=StringIO()).strip()
230 for o
in metadata_objs
.split("\n"):
231 if o
.startswith("{0:x}.".format(dir_inode_no
)):
233 self
.assertListEqual(frag_objs
, [])
235 def test_split_straydir(self
):
237 That stray dir is split when it becomes too large.
239 def _count_fragmented():
240 mdsdir_cache
= self
.fs
.read_cache("~mdsdir", 1)
242 for ino
in mdsdir_cache
:
243 if ino
["ino"] == 0x100:
245 if len(ino
["dirfrags"]) > 1:
246 log
.info("straydir 0x{:X} is fragmented".format(ino
["ino"]))
255 mds_bal_split_size
=split_size
,
256 mds_bal_merge_size
=merge_size
,
257 mds_bal_split_bits
=split_bits
,
258 mds_bal_fragment_size_max
=(split_size
* 100)
261 # manually split/merge
262 self
.assertEqual(_count_fragmented(), 0)
263 self
.fs
.mds_asok(["dirfrag", "split", "~mdsdir/stray8", "0/0", "1"])
264 self
.fs
.mds_asok(["dirfrag", "split", "~mdsdir/stray9", "0/0", "1"])
265 self
.wait_until_true(
266 lambda: _count_fragmented() == 2,
272 self
.fs
.mds_asok(["dirfrag", "merge", "~mdsdir/stray8", "0/0"])
273 self
.wait_until_true(
274 lambda: _count_fragmented() == 1,
282 # merging stray dirs is driven by MDCache::advance_stray()
283 # advance stray dir 10 times
285 self
.fs
.mds_asok(['flush', 'journal'])
287 self
.wait_until_true(
288 lambda: _count_fragmented() == 0,
294 # there are 10 stray dirs. advance stray dir 20 times
295 self
.mount_a
.create_n_files("testdir1/file", split_size
* 20)
296 self
.mount_a
.run_shell(["mkdir", "testdir2"])
297 testdir1_path
= os
.path
.join(self
.mount_a
.mountpoint
, "testdir1")
298 for i
in self
.mount_a
.ls(testdir1_path
):
299 self
.mount_a
.run_shell(["ln", "testdir1/{0}".format(i
), "testdir2/"])
301 self
.mount_a
.umount_wait()
302 self
.mount_a
.mount_wait()
303 self
.mount_a
.wait_until_mounted()
305 # flush journal and restart mds. after restart, testdir2 is not in mds' cache
306 self
.fs
.mds_asok(['flush', 'journal'])
307 self
.mds_cluster
.mds_fail_restart()
308 self
.fs
.wait_for_daemons()
309 # splitting stray dirs is driven by MDCache::advance_stray()
310 # advance stray dir after unlink 'split_size' files.
311 self
.fs
.mds_asok(['config', 'set', 'mds_log_events_per_segment', str(split_size
)])
313 self
.assertEqual(_count_fragmented(), 0)
314 self
.mount_a
.run_shell(["rm", "-rf", "testdir1"])
315 self
.wait_until_true(
316 lambda: _count_fragmented() > 0,
320 def test_dir_merge_with_snap_items(self
):
322 That directory remain fragmented when snapshot items are taken into account.
327 mds_bal_split_size
=split_size
,
328 mds_bal_merge_size
=merge_size
,
333 create_files
= split_size
+ 50
334 self
.mount_a
.create_n_files("splitdir/file_", create_files
)
336 self
.wait_until_true(
337 lambda: self
.get_splits() == 1,
341 frags
= self
.get_dir_ino("/splitdir")['dirfrags']
342 self
.assertEqual(len(frags
), 2)
343 self
.assertEqual(frags
[0]['dirfrag'], "0x10000000000.0*")
344 self
.assertEqual(frags
[1]['dirfrag'], "0x10000000000.1*")
346 sum([len(f
['dentries']) for f
in frags
]), create_files
349 self
.assertEqual(self
.get_merges(), 0)
351 self
.mount_a
.run_shell(["mkdir", "splitdir/.snap/snap_a"])
352 self
.mount_a
.run_shell(["mkdir", "splitdir/.snap/snap_b"])
353 self
.mount_a
.run_shell(["rm", "-f", run
.Raw("splitdir/file*")])
357 self
.assertEqual(self
.get_merges(), 0)
358 self
.assertEqual(len(self
.get_dir_ino("/splitdir")["dirfrags"]), 2)