4 from textwrap
import dedent
9 from teuthology
.orchestra
.run
import CommandFailedError
, Raw
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
12 log
= logging
.getLogger(__name__
)
15 class TestStrays(CephFSTestCase
):
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range
= 16
25 def test_ops_throttle(self
):
26 self
._test
_throttling
(self
.OPS_THROTTLE
)
29 def test_files_throttle(self
):
30 self
._test
_throttling
(self
.FILES_THROTTLE
)
32 def test_dir_deletion(self
):
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
40 create_script
= dedent("""
43 mount_path = "{mount_path}"
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
54 mount_path
=self
.mount_a
.mountpoint
,
59 self
.mount_a
.run_python(create_script
)
61 # That the dirfrag object is created
62 self
.fs
.mds_asok(["flush", "journal"])
63 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
64 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
67 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
68 self
.fs
.mds_asok(["flush", "journal"])
70 # That all the removed files get created as strays
71 strays
= self
.get_mdc_stat("strays_created")
72 self
.assertEqual(strays
, file_count
+ 1)
74 # That the strays all get enqueued for purge
75 self
.wait_until_equal(
76 lambda: self
.get_mdc_stat("strays_enqueued"),
82 # That all the purge operations execute
83 self
.wait_until_equal(
84 lambda: self
.get_stat("purge_queue", "pq_executed"),
89 # That finally, the directory metadata object is gone
90 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
92 # That finally, the data objects are all gone
93 self
.await_data_pool_empty()
95 def _test_throttling(self
, throttle_type
):
98 return self
._do
_test
_throttling
(throttle_type
)
100 for l
in self
.data_log
:
101 log
.info(",".join([l_
.__str
__() for l_
in l
]))
104 def _do_test_throttling(self
, throttle_type
):
106 That the mds_max_purge_ops setting is respected
109 def set_throttles(files
, ops
):
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
114 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
115 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
117 pgs
= self
.fs
.mon_manager
.get_pool_property(
118 self
.fs
.get_data_pool_name(),
121 ops_per_pg
= float(ops
) / pgs
122 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type
== self
.OPS_THROTTLE
:
130 set_throttles(files
=100000000, ops
=16)
131 size_unit
= 1024 * 1024 # big files, generate lots of ops
132 file_multiplier
= 100
133 elif throttle_type
== self
.FILES_THROTTLE
:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
137 set_throttles(ops
=100000000, files
=6)
138 size_unit
= 1024 # small, numerous files
139 file_multiplier
= 200
141 raise NotImplemented(throttle_type
)
143 # Pick up config changes
144 self
.fs
.mds_fail_restart()
145 self
.fs
.wait_for_daemons()
147 create_script
= dedent("""
150 mount_path = "{mount_path}"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
162 mount_path
=self
.mount_a
.mountpoint
,
164 file_multiplier
=file_multiplier
,
165 size_range
=self
.throttle_workload_size_range
168 self
.mount_a
.run_python(create_script
)
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
173 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
174 self
.fs
.mds_asok(["flush", "journal"])
176 background_thread
= gevent
.spawn(background
)
178 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
179 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
191 stats
= self
.fs
.mds_asok(['perf', 'dump'])
192 mdc_stats
= stats
['mds_cache']
193 pq_stats
= stats
['purge_queue']
194 if elapsed
>= purge_timeout
:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
197 num_strays
= mdc_stats
['num_strays']
198 num_strays_purging
= pq_stats
['pq_executing']
199 num_purge_ops
= pq_stats
['pq_executing_ops']
200 files_high_water
= pq_stats
['pq_executing_high_water']
201 ops_high_water
= pq_stats
['pq_executing_ops_high_water']
203 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
, files_high_water
, ops_high_water
])
205 total_strays_created
= mdc_stats
['strays_created']
206 total_strays_purged
= pq_stats
['pq_executed']
208 if total_strays_purged
== total_inodes
:
209 log
.info("Complete purge in {0} seconds".format(elapsed
))
211 elif total_strays_purged
> total_inodes
:
212 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
214 if throttle_type
== self
.OPS_THROTTLE
:
215 # 11 is filer_max_purge_ops plus one for the backtrace:
216 # limit is allowed to be overshot by this much.
217 if num_purge_ops
> mds_max_purge_ops
+ 11:
218 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
219 num_purge_ops
, mds_max_purge_ops
221 elif throttle_type
== self
.FILES_THROTTLE
:
222 if num_strays_purging
> mds_max_purge_files
:
223 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
224 num_strays_purging
, mds_max_purge_files
227 raise NotImplemented(throttle_type
)
229 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
230 num_strays_purging
, num_strays
,
231 total_strays_purged
, total_strays_created
236 background_thread
.join()
238 # Check that we got up to a respectable rate during the purge. This is totally
239 # racy, but should be safeish unless the cluster is pathologically slow, or
240 # insanely fast such that the deletions all pass before we have polled the
242 if throttle_type
== self
.OPS_THROTTLE
:
243 if ops_high_water
< mds_max_purge_ops
/ 2:
244 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
245 ops_high_water
, mds_max_purge_ops
247 # The MDS may go over mds_max_purge_ops for some items, like a
248 # heavily fragmented directory. The throttle does not kick in
249 # until *after* we reach or exceed the limit. This is expected
250 # because we don't want to starve the PQ or never purge a
251 # particularly large file/directory.
252 self
.assertLessEqual(ops_high_water
, mds_max_purge_ops
+64)
253 elif throttle_type
== self
.FILES_THROTTLE
:
254 if files_high_water
< mds_max_purge_files
/ 2:
255 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
256 files_high_water
, mds_max_purge_files
258 self
.assertLessEqual(files_high_water
, mds_max_purge_files
)
260 # Sanity check all MDC stray stats
261 stats
= self
.fs
.mds_asok(['perf', 'dump'])
262 mdc_stats
= stats
['mds_cache']
263 pq_stats
= stats
['purge_queue']
264 self
.assertEqual(mdc_stats
['num_strays'], 0)
265 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
266 self
.assertEqual(pq_stats
['pq_executing'], 0)
267 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
268 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
269 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
270 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
272 def get_mdc_stat(self
, name
, mds_id
=None):
273 return self
.get_stat("mds_cache", name
, mds_id
)
275 def get_stat(self
, subsys
, name
, mds_id
=None):
276 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
277 mds_id
=mds_id
)[subsys
][name
]
279 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
281 self
.wait_until_equal(
282 lambda: self
.get_stat(subsys
, counter
, mds_id
),
283 expect_val
=expect_val
, timeout
=timeout
,
284 reject_fn
=lambda x
: x
> expect_val
287 def test_open_inode(self
):
289 That the case of a dentry unlinked while a client holds an
290 inode open is handled correctly.
292 The inode should be moved into a stray dentry, while the original
293 dentry and directory should be purged.
295 The inode's data should be purged when the client eventually closes
298 mount_a_client_id
= self
.mount_a
.get_global_id()
300 # Write some bytes to a file
304 p
= self
.mount_a
.open_background("open_file")
305 self
.mount_a
.write_n_mb("open_file", size_mb
)
306 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
308 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
311 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
313 # Wait to see the stray count increment
314 self
.wait_until_equal(
315 lambda: self
.get_mdc_stat("num_strays"),
316 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
318 # See that while the stray count has incremented, none have passed
319 # on to the purge queue
320 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
321 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
323 # See that the client still holds 2 caps
324 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
326 # See that the data objects remain in the data pool
327 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
330 self
.mount_a
.kill_background(p
)
332 # Wait to see the client cap count decrement
333 self
.wait_until_equal(
334 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
335 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
337 # Wait to see the purge counter increment, stray count go to zero
338 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
339 self
.wait_until_equal(
340 lambda: self
.get_mdc_stat("num_strays"),
341 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
343 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
345 # See that the data objects no longer exist
346 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
348 self
.await_data_pool_empty()
350 def test_hardlink_reintegration(self
):
352 That removal of primary dentry of hardlinked inode results
353 in reintegration of inode into the previously-remote dentry,
354 rather than lingering as a stray indefinitely.
356 # Write some bytes to file_a
358 self
.mount_a
.run_shell(["mkdir", "dir_1"])
359 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
360 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
362 # Create a hardlink named file_b
363 self
.mount_a
.run_shell(["mkdir", "dir_2"])
364 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
365 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
368 self
.fs
.mds_asok(['flush', 'journal'])
370 # See that backtrace for the file points to the file_a path
371 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
372 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
374 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
375 self
.mount_a
.umount_wait()
376 self
.fs
.mds_asok(['flush', 'journal'])
377 self
.fs
.mds_fail_restart()
378 self
.fs
.wait_for_daemons()
382 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
384 # See that a stray was created
385 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
386 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
388 # Wait, see that data objects are still present (i.e. that the
389 # stray did not advance to purging given time)
391 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
392 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
394 # See that before reintegration, the inode's backtrace points to a stray dir
395 self
.fs
.mds_asok(['flush', 'journal'])
396 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
398 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
400 # Do a metadata operation on the remaining link (mv is heavy handed, but
401 # others like touch may be satisfied from caps without poking MDS)
402 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
404 # Stray reintegration should happen as a result of the eval_remote call
405 # on responding to a client request.
406 self
.wait_until_equal(
407 lambda: self
.get_mdc_stat("num_strays"),
412 # See the reintegration counter increment
413 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
414 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
415 last_reintegrated
= curr_reintegrated
418 self
.fs
.mds_asok(['flush', 'journal'])
420 # See that the backtrace for the file points to the remaining link's path
421 post_reint_bt
= self
.fs
.read_backtrace(ino
)
422 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
424 # mds should reintegrates stray when unlink finishes
425 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
426 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
428 # Stray reintegration should happen as a result of the notify_stray call
429 # on completion of unlink
430 self
.wait_until_equal(
431 lambda: self
.get_mdc_stat("num_strays"),
436 # See the reintegration counter increment
437 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
438 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
439 last_reintegrated
= curr_reintegrated
442 self
.fs
.mds_asok(['flush', 'journal'])
444 # See that the backtrace for the file points to the newest link's path
445 post_reint_bt
= self
.fs
.read_backtrace(ino
)
446 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
448 # Now really delete it
449 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
450 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
451 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
453 self
.assert_purge_idle()
454 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
456 # We caused the inode to go stray 3 times
457 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
458 # We purged it at the last
459 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
461 def test_mv_hardlink_cleanup(self
):
463 That when doing a rename from A to B, and B has hardlinks,
464 then we make a stray for B which is then reintegrated
465 into one of his hardlinks.
467 # Create file_a, file_b, and a hardlink to file_b
469 self
.mount_a
.write_n_mb("file_a", size_mb
)
470 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
472 self
.mount_a
.write_n_mb("file_b", size_mb
)
473 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
475 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
476 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
479 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
481 # Stray reintegration should happen as a result of the notify_stray call on
482 # completion of rename
483 self
.wait_until_equal(
484 lambda: self
.get_mdc_stat("num_strays"),
489 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
490 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
492 # No data objects should have been deleted, as both files still have linkage.
493 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
494 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
496 self
.fs
.mds_asok(['flush', 'journal'])
498 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
499 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
501 def _setup_two_ranks(self
):
503 self
.fs
.set_max_mds(2)
505 # See that we have two active MDSs
506 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
507 reject_fn
=lambda v
: v
> 2 or v
< 1)
509 active_mds_names
= self
.fs
.get_active_names()
510 rank_0_id
= active_mds_names
[0]
511 rank_1_id
= active_mds_names
[1]
512 log
.info("Ranks 0 and 1 are {0} and {1}".format(
513 rank_0_id
, rank_1_id
))
515 # Get rid of other MDS daemons so that it's easier to know which
516 # daemons to expect in which ranks after restarts
517 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
518 self
.mds_cluster
.mds_stop(unneeded_mds
)
519 self
.mds_cluster
.mds_fail(unneeded_mds
)
521 return rank_0_id
, rank_1_id
523 def _force_migrate(self
, to_id
, path
, watch_ino
):
525 :param to_id: MDS id to move it to
526 :param path: Filesystem path (string) to move
527 :param watch_ino: Inode number to look for at destination to confirm move
530 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path
])
532 # Poll the MDS cache dump to watch for the export completing
537 data
= self
.fs
.mds_asok(["dump", "cache"], to_id
)
538 for inode_data
in data
:
539 if inode_data
['ino'] == watch_ino
:
540 log
.debug("Found ino in cache: {0}".format(json
.dumps(inode_data
, indent
=2)))
541 if inode_data
['is_auth'] is True:
546 if migrate_elapsed
> migrate_timeout
:
547 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed
))
552 def _is_stopped(self
, rank
):
553 mds_map
= self
.fs
.get_mds_map()
554 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
556 def test_purge_on_shutdown(self
):
558 That when an MDS rank is shut down, its purge queue is
559 drained in the process.
561 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
563 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
564 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
565 self
.fs
.wait_for_daemons()
569 self
.mount_a
.create_n_files("delete_me/file", file_count
)
571 self
._force
_migrate
(rank_1_id
, "delete_me",
572 self
.mount_a
.path_to_ino("delete_me/file_0"))
574 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
575 self
.mount_a
.umount_wait()
577 # See all the strays go into purge queue
578 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
579 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
580 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
582 # See nothing get purged from the purge queue (yet)
584 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
587 self
.fs
.set_max_mds(1)
589 # It shouldn't proceed past stopping because its still not allowed
592 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
593 self
.assertFalse(self
._is
_stopped
(1))
595 # Permit the daemon to start purging again
596 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
598 "--mds_max_purge_files 100")
600 # It should now proceed through shutdown
601 self
.fs
.wait_for_daemons(timeout
=120)
603 # ...and in the process purge all that data
604 self
.await_data_pool_empty()
606 def test_migration_on_shutdown(self
):
608 That when an MDS rank is shut down, any non-purgeable strays
609 get migrated to another rank.
612 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
614 # Create a non-purgeable stray in a ~mds1 stray directory
615 # by doing a hard link and deleting the original file
616 self
.mount_a
.run_shell(["mkdir", "dir_1", "dir_2"])
617 self
.mount_a
.run_shell(["touch", "dir_1/original"])
618 self
.mount_a
.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
620 self
._force
_migrate
(rank_1_id
, "dir_1",
621 self
.mount_a
.path_to_ino("dir_1/original"))
623 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
624 self
.mount_a
.umount_wait()
625 self
.fs
.mds_asok(['flush', 'journal'], rank_0_id
)
626 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
627 self
.fs
.mds_fail_restart()
628 self
.fs
.wait_for_daemons()
630 active_mds_names
= self
.fs
.get_active_names()
631 rank_0_id
= active_mds_names
[0]
632 rank_1_id
= active_mds_names
[1]
636 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
637 self
.mount_a
.umount_wait()
639 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
643 self
.fs
.set_max_mds(1)
644 self
.fs
.wait_for_daemons(timeout
=120)
646 # See that the stray counter on rank 0 has incremented
647 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
649 def assert_backtrace(self
, ino
, expected_path
):
651 Assert that the backtrace in the data pool for an inode matches
652 an expected /foo/bar path.
654 expected_elements
= expected_path
.strip("/").split("/")
655 bt
= self
.fs
.read_backtrace(ino
)
656 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
657 self
.assertListEqual(expected_elements
, actual_elements
)
659 def get_backtrace_path(self
, ino
):
660 bt
= self
.fs
.read_backtrace(ino
)
661 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
662 return "/".join(elements
)
664 def assert_purge_idle(self
):
666 Assert that the MDS perf counters indicate no strays exist and
667 no ongoing purge activity. Sanity check for when PurgeQueue should
670 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
671 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
672 self
.assertEqual(mdc_stats
["num_strays"], 0)
673 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
674 self
.assertEqual(pq_stats
["pq_executing"], 0)
675 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
677 def test_mv_cleanup(self
):
679 That when doing a rename from A to B, and B has no hardlinks,
680 then we make a stray for B and purge him.
682 # Create file_a and file_b, write some to both
684 self
.mount_a
.write_n_mb("file_a", size_mb
)
685 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
686 self
.mount_a
.write_n_mb("file_b", size_mb
)
687 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
689 self
.fs
.mds_asok(['flush', 'journal'])
690 self
.assert_backtrace(file_a_ino
, "file_a")
691 self
.assert_backtrace(file_b_ino
, "file_b")
694 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
696 # See that stray counter increments
697 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
698 # Wait for purge counter to increment
699 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
700 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
702 self
.assert_purge_idle()
704 # file_b should have been purged
705 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
707 # Backtrace should have updated from file_a to file_b
708 self
.fs
.mds_asok(['flush', 'journal'])
709 self
.assert_backtrace(file_a_ino
, "file_b")
711 # file_a's data should still exist
712 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
714 def _pool_df(self
, pool_name
):
720 "max_avail": 19630292406,
724 :param pool_name: Which pool (must exist)
726 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
727 for p
in json
.loads(out
)['pools']:
728 if p
['name'] == pool_name
:
731 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
733 def await_data_pool_empty(self
):
734 self
.wait_until_true(
735 lambda: self
._pool
_df
(
736 self
.fs
.get_data_pool_name()
740 def test_snapshot_remove(self
):
742 That removal of a snapshot that references a now-unlinked file results
743 in purging on the stray for the file.
746 self
.fs
.set_allow_new_snaps(True)
748 # Create a dir with a file in it
750 self
.mount_a
.run_shell(["mkdir", "snapdir"])
751 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
752 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
753 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
756 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
758 # Cause the head revision to deviate from the snapshot
759 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
761 # Flush the journal so that backtraces, dirfrag objects will actually be written
762 self
.fs
.mds_asok(["flush", "journal"])
765 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
766 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
768 # Unmount the client because when I come back to check the data is still
769 # in the file I don't want to just see what's in the page cache.
770 self
.mount_a
.umount_wait()
772 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
774 # FIXME: at this stage we see a purge and the stray count drops to
775 # zero, but there's actually still a stray, so at the very
776 # least the StrayManager stats code is slightly off
780 # See that the data from the snapshotted revision of the file is still present
782 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
784 # Remove the snapshot
785 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
787 # Purging file_a doesn't happen until after we've flushed the journal, because
788 # it is referenced by the snapshotted subdir, and the snapshot isn't really
789 # gone until the journal references to it are gone
790 self
.fs
.mds_asok(["flush", "journal"])
792 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
793 # See also: http://tracker.ceph.com/issues/20072
794 self
.wait_until_true(
795 lambda: self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024),
799 # See that a purge happens now
800 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
801 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
803 self
.await_data_pool_empty()
805 def test_fancy_layout(self
):
807 purge stray file with fancy layout
810 file_name
= "fancy_layout_file"
811 self
.mount_a
.run_shell(["touch", file_name
])
813 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
814 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
816 # 35MB requires 7 objects
818 self
.mount_a
.write_n_mb(file_name
, size_mb
)
820 self
.mount_a
.run_shell(["rm", "-f", file_name
])
821 self
.fs
.mds_asok(["flush", "journal"])
823 # can't use self.fs.data_objects_absent here, it does not support fancy layout
824 self
.await_data_pool_empty()
826 def test_dirfrag_limit(self
):
828 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
830 That fragmentation (forced) will allow more entries to be created.
832 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
836 for mds
in self
.fs
.get_daemon_names():
837 self
.fs
.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT
)], mds
)
840 self
.mount_a
.run_python(dedent("""
842 path = os.path.join("{path}", "subdir")
844 for n in range(0, {file_count}):
845 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
847 path
=self
.mount_a
.mountpoint
,
848 file_count
=LOW_LIMIT
+1
850 except CommandFailedError
:
853 raise RuntimeError("fragment size exceeded")
855 # Now test that we can go beyond the limit if we fragment the directory
857 self
.mount_a
.run_python(dedent("""
859 path = os.path.join("{path}", "subdir2")
861 for n in range(0, {file_count}):
862 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
863 dfd = os.open(path, os.O_DIRECTORY)
866 path
=self
.mount_a
.mountpoint
,
870 # Ensure that subdir2 is fragmented
871 mds_id
= self
.fs
.get_active_names()[0]
872 self
.fs
.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id
)
874 # remount+flush (release client caps)
875 self
.mount_a
.umount_wait()
876 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
878 self
.mount_a
.wait_until_mounted()
880 # Create 50% more files than the current fragment limit
881 self
.mount_a
.run_python(dedent("""
883 path = os.path.join("{path}", "subdir2")
884 for n in range({file_count}, ({file_count}*3)//2):
885 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
887 path
=self
.mount_a
.mountpoint
,
891 # Now test the stray directory size is limited and recovers
892 strays_before
= self
.get_mdc_stat("strays_created")
894 self
.mount_a
.run_python(dedent("""
896 path = os.path.join("{path}", "subdir3")
898 for n in range({file_count}):
899 fpath = os.path.join(path, "%s" % n)
905 path
=self
.mount_a
.mountpoint
,
906 file_count
=LOW_LIMIT
*10 # 10 stray directories, should collide before this count
908 except CommandFailedError
:
911 raise RuntimeError("fragment size exceeded")
913 strays_after
= self
.get_mdc_stat("strays_created")
914 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
916 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
917 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
919 self
.mount_a
.run_python(dedent("""
921 path = os.path.join("{path}", "subdir4")
923 for n in range({file_count}):
924 fpath = os.path.join(path, "%s" % n)
930 path
=self
.mount_a
.mountpoint
,
934 def test_purge_queue_upgrade(self
):
936 That when starting on a system with no purge queue in the metadata
937 pool, we silently create one.
941 self
.mds_cluster
.mds_stop()
942 self
.mds_cluster
.mds_fail()
943 self
.fs
.rados(["rm", "500.00000000"])
944 self
.mds_cluster
.mds_restart()
945 self
.fs
.wait_for_daemons()
947 def test_replicated_delete_speed(self
):
949 That deletions of replicated metadata are not pathologically slow
951 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
953 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
954 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
955 self
.fs
.wait_for_daemons()
959 self
.mount_a
.create_n_files("delete_me/file", file_count
)
961 self
._force
_migrate
(rank_1_id
, "delete_me",
962 self
.mount_a
.path_to_ino("delete_me/file_0"))
964 begin
= datetime
.datetime
.now()
965 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
966 end
= datetime
.datetime
.now()
968 # What we're really checking here is that we are completing client
969 # operations immediately rather than delaying until the next tick.
970 tick_period
= float(self
.fs
.get_config("mds_tick_interval",
973 duration
= (end
- begin
).total_seconds()
974 self
.assertLess(duration
, (file_count
* tick_period
) * 0.25)