4 from textwrap
import dedent
9 from teuthology
.orchestra
.run
import CommandFailedError
, Raw
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
12 log
= logging
.getLogger(__name__
)
15 class TestStrays(CephFSTestCase
):
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range
= 16
25 def test_ops_throttle(self
):
26 self
._test
_throttling
(self
.OPS_THROTTLE
)
29 def test_files_throttle(self
):
30 self
._test
_throttling
(self
.FILES_THROTTLE
)
32 def test_dir_deletion(self
):
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
40 create_script
= dedent("""
43 mount_path = "{mount_path}"
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
54 mount_path
=self
.mount_a
.mountpoint
,
59 self
.mount_a
.run_python(create_script
)
61 # That the dirfrag object is created
62 self
.fs
.mds_asok(["flush", "journal"])
63 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
64 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
67 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
68 self
.fs
.mds_asok(["flush", "journal"])
70 # That all the removed files get created as strays
71 strays
= self
.get_mdc_stat("strays_created")
72 self
.assertEqual(strays
, file_count
+ 1)
74 # That the strays all get enqueued for purge
75 self
.wait_until_equal(
76 lambda: self
.get_mdc_stat("strays_enqueued"),
82 # That all the purge operations execute
83 self
.wait_until_equal(
84 lambda: self
.get_stat("purge_queue", "pq_executed"),
89 # That finally, the directory metadata object is gone
90 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
92 # That finally, the data objects are all gone
93 self
.await_data_pool_empty()
95 def _test_throttling(self
, throttle_type
):
98 return self
._do
_test
_throttling
(throttle_type
)
100 for l
in self
.data_log
:
101 log
.info(",".join([l_
.__str
__() for l_
in l
]))
104 def _do_test_throttling(self
, throttle_type
):
106 That the mds_max_purge_ops setting is respected
109 def set_throttles(files
, ops
):
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
114 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
115 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
117 pgs
= self
.fs
.mon_manager
.get_pool_property(
118 self
.fs
.get_data_pool_name(),
121 ops_per_pg
= float(ops
) / pgs
122 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type
== self
.OPS_THROTTLE
:
130 set_throttles(files
=100000000, ops
=16)
131 size_unit
= 1024 * 1024 # big files, generate lots of ops
132 file_multiplier
= 100
133 elif throttle_type
== self
.FILES_THROTTLE
:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
137 set_throttles(ops
=100000000, files
=6)
138 size_unit
= 1024 # small, numerous files
139 file_multiplier
= 200
141 raise NotImplemented(throttle_type
)
143 # Pick up config changes
144 self
.fs
.mds_fail_restart()
145 self
.fs
.wait_for_daemons()
147 create_script
= dedent("""
150 mount_path = "{mount_path}"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
162 mount_path
=self
.mount_a
.mountpoint
,
164 file_multiplier
=file_multiplier
,
165 size_range
=self
.throttle_workload_size_range
168 self
.mount_a
.run_python(create_script
)
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
173 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
174 self
.fs
.mds_asok(["flush", "journal"])
176 background_thread
= gevent
.spawn(background
)
178 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
179 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
191 stats
= self
.fs
.mds_asok(['perf', 'dump'])
192 mdc_stats
= stats
['mds_cache']
193 pq_stats
= stats
['purge_queue']
194 if elapsed
>= purge_timeout
:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
197 num_strays
= mdc_stats
['num_strays']
198 num_strays_purging
= pq_stats
['pq_executing']
199 num_purge_ops
= pq_stats
['pq_executing_ops']
201 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
])
203 files_high_water
= max(files_high_water
, num_strays_purging
)
204 ops_high_water
= max(ops_high_water
, num_purge_ops
)
206 total_strays_created
= mdc_stats
['strays_created']
207 total_strays_purged
= pq_stats
['pq_executed']
209 if total_strays_purged
== total_inodes
:
210 log
.info("Complete purge in {0} seconds".format(elapsed
))
212 elif total_strays_purged
> total_inodes
:
213 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
215 if throttle_type
== self
.OPS_THROTTLE
:
216 # 11 is filer_max_purge_ops plus one for the backtrace:
217 # limit is allowed to be overshot by this much.
218 if num_purge_ops
> mds_max_purge_ops
+ 11:
219 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
220 num_purge_ops
, mds_max_purge_ops
222 elif throttle_type
== self
.FILES_THROTTLE
:
223 if num_strays_purging
> mds_max_purge_files
:
224 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
225 num_strays_purging
, mds_max_purge_files
228 raise NotImplemented(throttle_type
)
230 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
231 num_strays_purging
, num_strays
,
232 total_strays_purged
, total_strays_created
237 background_thread
.join()
239 # Check that we got up to a respectable rate during the purge. This is totally
240 # racy, but should be safeish unless the cluster is pathologically slow, or
241 # insanely fast such that the deletions all pass before we have polled the
243 if throttle_type
== self
.OPS_THROTTLE
:
244 if ops_high_water
< mds_max_purge_ops
/ 2:
245 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
246 ops_high_water
, mds_max_purge_ops
248 elif throttle_type
== self
.FILES_THROTTLE
:
249 if files_high_water
< mds_max_purge_files
/ 2:
250 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
251 ops_high_water
, mds_max_purge_files
254 # Sanity check all MDC stray stats
255 stats
= self
.fs
.mds_asok(['perf', 'dump'])
256 mdc_stats
= stats
['mds_cache']
257 pq_stats
= stats
['purge_queue']
258 self
.assertEqual(mdc_stats
['num_strays'], 0)
259 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
260 self
.assertEqual(pq_stats
['pq_executing'], 0)
261 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
262 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
263 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
264 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
266 def get_mdc_stat(self
, name
, mds_id
=None):
267 return self
.get_stat("mds_cache", name
, mds_id
)
269 def get_stat(self
, subsys
, name
, mds_id
=None):
270 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
271 mds_id
=mds_id
)[subsys
][name
]
273 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
275 self
.wait_until_equal(
276 lambda: self
.get_stat(subsys
, counter
, mds_id
),
277 expect_val
=expect_val
, timeout
=timeout
,
278 reject_fn
=lambda x
: x
> expect_val
281 def test_open_inode(self
):
283 That the case of a dentry unlinked while a client holds an
284 inode open is handled correctly.
286 The inode should be moved into a stray dentry, while the original
287 dentry and directory should be purged.
289 The inode's data should be purged when the client eventually closes
292 mount_a_client_id
= self
.mount_a
.get_global_id()
294 # Write some bytes to a file
298 p
= self
.mount_a
.open_background("open_file")
299 self
.mount_a
.write_n_mb("open_file", size_mb
)
300 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
302 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
305 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
307 # Wait to see the stray count increment
308 self
.wait_until_equal(
309 lambda: self
.get_mdc_stat("num_strays"),
310 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
312 # See that while the stray count has incremented, none have passed
313 # on to the purge queue
314 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
315 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
317 # See that the client still holds 2 caps
318 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
320 # See that the data objects remain in the data pool
321 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
324 self
.mount_a
.kill_background(p
)
326 # Wait to see the client cap count decrement
327 self
.wait_until_equal(
328 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
329 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
331 # Wait to see the purge counter increment, stray count go to zero
332 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
333 self
.wait_until_equal(
334 lambda: self
.get_mdc_stat("num_strays"),
335 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
337 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
339 # See that the data objects no longer exist
340 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
342 self
.await_data_pool_empty()
344 def test_hardlink_reintegration(self
):
346 That removal of primary dentry of hardlinked inode results
347 in reintegration of inode into the previously-remote dentry,
348 rather than lingering as a stray indefinitely.
350 # Write some bytes to file_a
352 self
.mount_a
.run_shell(["mkdir", "dir_1"])
353 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
354 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
356 # Create a hardlink named file_b
357 self
.mount_a
.run_shell(["mkdir", "dir_2"])
358 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
359 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
362 self
.fs
.mds_asok(['flush', 'journal'])
364 # See that backtrace for the file points to the file_a path
365 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
366 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
368 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
369 self
.mount_a
.umount_wait()
370 self
.fs
.mds_asok(['flush', 'journal'])
371 self
.fs
.mds_fail_restart()
372 self
.fs
.wait_for_daemons()
376 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
378 # See that a stray was created
379 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
380 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
382 # Wait, see that data objects are still present (i.e. that the
383 # stray did not advance to purging given time)
385 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
386 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
388 # See that before reintegration, the inode's backtrace points to a stray dir
389 self
.fs
.mds_asok(['flush', 'journal'])
390 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
392 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
394 # Do a metadata operation on the remaining link (mv is heavy handed, but
395 # others like touch may be satisfied from caps without poking MDS)
396 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
398 # Stray reintegration should happen as a result of the eval_remote call
399 # on responding to a client request.
400 self
.wait_until_equal(
401 lambda: self
.get_mdc_stat("num_strays"),
406 # See the reintegration counter increment
407 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
408 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
409 last_reintegrated
= curr_reintegrated
412 self
.fs
.mds_asok(['flush', 'journal'])
414 # See that the backtrace for the file points to the remaining link's path
415 post_reint_bt
= self
.fs
.read_backtrace(ino
)
416 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
418 # mds should reintegrates stray when unlink finishes
419 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
420 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
422 # Stray reintegration should happen as a result of the notify_stray call
423 # on completion of unlink
424 self
.wait_until_equal(
425 lambda: self
.get_mdc_stat("num_strays"),
430 # See the reintegration counter increment
431 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
432 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
433 last_reintegrated
= curr_reintegrated
436 self
.fs
.mds_asok(['flush', 'journal'])
438 # See that the backtrace for the file points to the newest link's path
439 post_reint_bt
= self
.fs
.read_backtrace(ino
)
440 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
442 # Now really delete it
443 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
444 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
445 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
447 self
.assert_purge_idle()
448 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
450 # We caused the inode to go stray 3 times
451 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
452 # We purged it at the last
453 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
455 def test_mv_hardlink_cleanup(self
):
457 That when doing a rename from A to B, and B has hardlinks,
458 then we make a stray for B which is then reintegrated
459 into one of his hardlinks.
461 # Create file_a, file_b, and a hardlink to file_b
463 self
.mount_a
.write_n_mb("file_a", size_mb
)
464 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
466 self
.mount_a
.write_n_mb("file_b", size_mb
)
467 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
469 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
470 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
473 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
475 # Stray reintegration should happen as a result of the notify_stray call on
476 # completion of rename
477 self
.wait_until_equal(
478 lambda: self
.get_mdc_stat("num_strays"),
483 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
484 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
486 # No data objects should have been deleted, as both files still have linkage.
487 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
488 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
490 self
.fs
.mds_asok(['flush', 'journal'])
492 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
493 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
495 def _setup_two_ranks(self
):
497 self
.fs
.set_max_mds(2)
499 # See that we have two active MDSs
500 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
501 reject_fn
=lambda v
: v
> 2 or v
< 1)
503 active_mds_names
= self
.fs
.get_active_names()
504 rank_0_id
= active_mds_names
[0]
505 rank_1_id
= active_mds_names
[1]
506 log
.info("Ranks 0 and 1 are {0} and {1}".format(
507 rank_0_id
, rank_1_id
))
509 # Get rid of other MDS daemons so that it's easier to know which
510 # daemons to expect in which ranks after restarts
511 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
512 self
.mds_cluster
.mds_stop(unneeded_mds
)
513 self
.mds_cluster
.mds_fail(unneeded_mds
)
515 return rank_0_id
, rank_1_id
517 def _force_migrate(self
, to_id
, path
, watch_ino
):
519 :param to_id: MDS id to move it to
520 :param path: Filesystem path (string) to move
521 :param watch_ino: Inode number to look for at destination to confirm move
524 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path
])
526 # Poll the MDS cache dump to watch for the export completing
531 data
= self
.fs
.mds_asok(["dump", "cache"], to_id
)
532 for inode_data
in data
:
533 if inode_data
['ino'] == watch_ino
:
534 log
.debug("Found ino in cache: {0}".format(json
.dumps(inode_data
, indent
=2)))
535 if inode_data
['is_auth'] is True:
540 if migrate_elapsed
> migrate_timeout
:
541 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed
))
546 def _is_stopped(self
, rank
):
547 mds_map
= self
.fs
.get_mds_map()
548 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
550 def test_purge_on_shutdown(self
):
552 That when an MDS rank is shut down, its purge queue is
553 drained in the process.
555 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
557 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
558 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
559 self
.fs
.wait_for_daemons()
563 self
.mount_a
.create_n_files("delete_me/file", file_count
)
565 self
._force
_migrate
(rank_1_id
, "delete_me",
566 self
.mount_a
.path_to_ino("delete_me/file_0"))
568 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
569 self
.mount_a
.umount_wait()
571 # See all the strays go into purge queue
572 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
573 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
574 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
576 # See nothing get purged from the purge queue (yet)
578 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
581 self
.fs
.set_max_mds(1)
583 # It shouldn't proceed past stopping because its still not allowed
586 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
587 self
.assertFalse(self
._is
_stopped
(1))
589 # Permit the daemon to start purging again
590 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
592 "--mds_max_purge_files 100")
594 # It should now proceed through shutdown
595 self
.fs
.wait_for_daemons(timeout
=120)
597 # ...and in the process purge all that data
598 self
.await_data_pool_empty()
600 def test_migration_on_shutdown(self
):
602 That when an MDS rank is shut down, any non-purgeable strays
603 get migrated to another rank.
606 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
608 # Create a non-purgeable stray in a ~mds1 stray directory
609 # by doing a hard link and deleting the original file
610 self
.mount_a
.run_shell(["mkdir", "dir_1", "dir_2"])
611 self
.mount_a
.run_shell(["touch", "dir_1/original"])
612 self
.mount_a
.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
614 self
._force
_migrate
(rank_1_id
, "dir_1",
615 self
.mount_a
.path_to_ino("dir_1/original"))
617 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
618 self
.mount_a
.umount_wait()
619 self
.fs
.mds_asok(['flush', 'journal'], rank_0_id
)
620 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
621 self
.fs
.mds_fail_restart()
622 self
.fs
.wait_for_daemons()
624 active_mds_names
= self
.fs
.get_active_names()
625 rank_0_id
= active_mds_names
[0]
626 rank_1_id
= active_mds_names
[1]
630 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
631 self
.mount_a
.umount_wait()
633 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
637 self
.fs
.set_max_mds(1)
638 self
.fs
.wait_for_daemons(timeout
=120)
640 # See that the stray counter on rank 0 has incremented
641 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
643 def assert_backtrace(self
, ino
, expected_path
):
645 Assert that the backtrace in the data pool for an inode matches
646 an expected /foo/bar path.
648 expected_elements
= expected_path
.strip("/").split("/")
649 bt
= self
.fs
.read_backtrace(ino
)
650 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
651 self
.assertListEqual(expected_elements
, actual_elements
)
653 def get_backtrace_path(self
, ino
):
654 bt
= self
.fs
.read_backtrace(ino
)
655 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
656 return "/".join(elements
)
658 def assert_purge_idle(self
):
660 Assert that the MDS perf counters indicate no strays exist and
661 no ongoing purge activity. Sanity check for when PurgeQueue should
664 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
665 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
666 self
.assertEqual(mdc_stats
["num_strays"], 0)
667 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
668 self
.assertEqual(pq_stats
["pq_executing"], 0)
669 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
671 def test_mv_cleanup(self
):
673 That when doing a rename from A to B, and B has no hardlinks,
674 then we make a stray for B and purge him.
676 # Create file_a and file_b, write some to both
678 self
.mount_a
.write_n_mb("file_a", size_mb
)
679 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
680 self
.mount_a
.write_n_mb("file_b", size_mb
)
681 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
683 self
.fs
.mds_asok(['flush', 'journal'])
684 self
.assert_backtrace(file_a_ino
, "file_a")
685 self
.assert_backtrace(file_b_ino
, "file_b")
688 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
690 # See that stray counter increments
691 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
692 # Wait for purge counter to increment
693 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
694 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
696 self
.assert_purge_idle()
698 # file_b should have been purged
699 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
701 # Backtrace should have updated from file_a to file_b
702 self
.fs
.mds_asok(['flush', 'journal'])
703 self
.assert_backtrace(file_a_ino
, "file_b")
705 # file_a's data should still exist
706 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
708 def _pool_df(self
, pool_name
):
714 "max_avail": 19630292406,
718 :param pool_name: Which pool (must exist)
720 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
721 for p
in json
.loads(out
)['pools']:
722 if p
['name'] == pool_name
:
725 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
727 def await_data_pool_empty(self
):
728 self
.wait_until_true(
729 lambda: self
._pool
_df
(
730 self
.fs
.get_data_pool_name()
734 def test_snapshot_remove(self
):
736 That removal of a snapshot that references a now-unlinked file results
737 in purging on the stray for the file.
740 self
.fs
.set_allow_new_snaps(True)
742 # Create a dir with a file in it
744 self
.mount_a
.run_shell(["mkdir", "snapdir"])
745 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
746 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
747 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
750 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
752 # Cause the head revision to deviate from the snapshot
753 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
755 # Flush the journal so that backtraces, dirfrag objects will actually be written
756 self
.fs
.mds_asok(["flush", "journal"])
759 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
760 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
762 # Unmount the client because when I come back to check the data is still
763 # in the file I don't want to just see what's in the page cache.
764 self
.mount_a
.umount_wait()
766 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
768 # FIXME: at this stage we see a purge and the stray count drops to
769 # zero, but there's actually still a stray, so at the very
770 # least the StrayManager stats code is slightly off
774 # See that the data from the snapshotted revision of the file is still present
776 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
778 # Remove the snapshot
779 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
781 # Purging file_a doesn't happen until after we've flushed the journal, because
782 # it is referenced by the snapshotted subdir, and the snapshot isn't really
783 # gone until the journal references to it are gone
784 self
.fs
.mds_asok(["flush", "journal"])
786 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
787 # See also: http://tracker.ceph.com/issues/20072
788 self
.wait_until_true(
789 lambda: self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024),
793 # See that a purge happens now
794 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
795 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
797 self
.await_data_pool_empty()
799 def test_fancy_layout(self
):
801 purge stray file with fancy layout
804 file_name
= "fancy_layout_file"
805 self
.mount_a
.run_shell(["touch", file_name
])
807 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
808 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
810 # 35MB requires 7 objects
812 self
.mount_a
.write_n_mb(file_name
, size_mb
)
814 self
.mount_a
.run_shell(["rm", "-f", file_name
])
815 self
.fs
.mds_asok(["flush", "journal"])
817 # can't use self.fs.data_objects_absent here, it does not support fancy layout
818 self
.await_data_pool_empty()
820 def test_dirfrag_limit(self
):
822 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
824 That fragmentation (forced) will allow more entries to be created.
826 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
830 for mds
in self
.fs
.get_daemon_names():
831 self
.fs
.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT
)], mds
)
834 self
.mount_a
.run_python(dedent("""
836 path = os.path.join("{path}", "subdir")
838 for n in range(0, {file_count}):
839 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
841 path
=self
.mount_a
.mountpoint
,
842 file_count
=LOW_LIMIT
+1
844 except CommandFailedError
:
847 raise RuntimeError("fragment size exceeded")
849 # Now test that we can go beyond the limit if we fragment the directory
851 self
.mount_a
.run_python(dedent("""
853 path = os.path.join("{path}", "subdir2")
855 for n in range(0, {file_count}):
856 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
857 dfd = os.open(path, os.O_DIRECTORY)
860 path
=self
.mount_a
.mountpoint
,
864 # Ensure that subdir2 is fragmented
865 mds_id
= self
.fs
.get_active_names()[0]
866 self
.fs
.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id
)
868 # remount+flush (release client caps)
869 self
.mount_a
.umount_wait()
870 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
872 self
.mount_a
.wait_until_mounted()
874 # Create 50% more files than the current fragment limit
875 self
.mount_a
.run_python(dedent("""
877 path = os.path.join("{path}", "subdir2")
878 for n in range({file_count}, ({file_count}*3)//2):
879 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
881 path
=self
.mount_a
.mountpoint
,
885 # Now test the stray directory size is limited and recovers
886 strays_before
= self
.get_mdc_stat("strays_created")
888 self
.mount_a
.run_python(dedent("""
890 path = os.path.join("{path}", "subdir3")
892 for n in range({file_count}):
893 fpath = os.path.join(path, "%s" % n)
899 path
=self
.mount_a
.mountpoint
,
900 file_count
=LOW_LIMIT
*10 # 10 stray directories, should collide before this count
902 except CommandFailedError
:
905 raise RuntimeError("fragment size exceeded")
907 strays_after
= self
.get_mdc_stat("strays_created")
908 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
910 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
911 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
913 self
.mount_a
.run_python(dedent("""
915 path = os.path.join("{path}", "subdir4")
917 for n in range({file_count}):
918 fpath = os.path.join(path, "%s" % n)
924 path
=self
.mount_a
.mountpoint
,
928 def test_purge_queue_upgrade(self
):
930 That when starting on a system with no purge queue in the metadata
931 pool, we silently create one.
935 self
.mds_cluster
.mds_stop()
936 self
.mds_cluster
.mds_fail()
937 self
.fs
.rados(["rm", "500.00000000"])
938 self
.mds_cluster
.mds_restart()
939 self
.fs
.wait_for_daemons()
941 def test_replicated_delete_speed(self
):
943 That deletions of replicated metadata are not pathologically slow
945 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
947 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
948 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
949 self
.fs
.wait_for_daemons()
953 self
.mount_a
.create_n_files("delete_me/file", file_count
)
955 self
._force
_migrate
(rank_1_id
, "delete_me",
956 self
.mount_a
.path_to_ino("delete_me/file_0"))
958 begin
= datetime
.datetime
.now()
959 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
960 end
= datetime
.datetime
.now()
962 # What we're really checking here is that we are completing client
963 # operations immediately rather than delaying until the next tick.
964 tick_period
= float(self
.fs
.get_config("mds_tick_interval",
967 duration
= (end
- begin
).total_seconds()
968 self
.assertLess(duration
, (file_count
* tick_period
) * 0.25)