4 from textwrap
import dedent
8 from teuthology
.exceptions
import CommandFailedError
9 from teuthology
.orchestra
.run
import Raw
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
12 log
= logging
.getLogger(__name__
)
15 class TestStrays(CephFSTestCase
):
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range
= 16
25 def test_ops_throttle(self
):
26 self
._test
_throttling
(self
.OPS_THROTTLE
)
29 def test_files_throttle(self
):
30 self
._test
_throttling
(self
.FILES_THROTTLE
)
32 def test_dir_deletion(self
):
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
40 create_script
= dedent("""
43 mountpoint = "{mountpoint}"
46 file_count = {file_count}
47 os.mkdir(os.path.join(mountpoint, subdir))
48 for i in range(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
53 mountpoint
=self
.mount_a
.mountpoint
,
58 self
.mount_a
.run_python(create_script
)
60 # That the dirfrag object is created
61 self
.fs
.mds_asok(["flush", "journal"])
62 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
63 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
66 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
67 self
.fs
.mds_asok(["flush", "journal"])
69 # That all the removed files get created as strays
70 strays
= self
.get_mdc_stat("strays_created")
71 self
.assertEqual(strays
, file_count
+ 1)
73 # That the strays all get enqueued for purge
74 self
.wait_until_equal(
75 lambda: self
.get_mdc_stat("strays_enqueued"),
81 # That all the purge operations execute
82 self
.wait_until_equal(
83 lambda: self
.get_stat("purge_queue", "pq_executed"),
88 # That finally, the directory metadata object is gone
89 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
91 # That finally, the data objects are all gone
92 self
.await_data_pool_empty()
94 def _test_throttling(self
, throttle_type
):
97 return self
._do
_test
_throttling
(throttle_type
)
99 for l
in self
.data_log
:
100 log
.info(",".join([l_
.__str
__() for l_
in l
]))
103 def _do_test_throttling(self
, throttle_type
):
105 That the mds_max_purge_ops setting is respected
108 def set_throttles(files
, ops
):
110 Helper for updating ops/files limits, and calculating effective
111 ops_per_pg setting to give the same ops limit.
113 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
114 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
116 pgs
= self
.fs
.mon_manager
.get_pool_int_property(
117 self
.fs
.get_data_pool_name(),
120 ops_per_pg
= float(ops
) / pgs
121 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
123 # Test conditions depend on what we're going to be exercising.
124 # * Lift the threshold on whatever throttle we are *not* testing, so
125 # that the throttle of interest is the one that will be the bottleneck
126 # * Create either many small files (test file count throttling) or fewer
127 # large files (test op throttling)
128 if throttle_type
== self
.OPS_THROTTLE
:
129 set_throttles(files
=100000000, ops
=16)
130 size_unit
= 1024 * 1024 # big files, generate lots of ops
131 file_multiplier
= 100
132 elif throttle_type
== self
.FILES_THROTTLE
:
133 # The default value of file limit is pretty permissive, so to avoid
134 # the test running too fast, create lots of files and set the limit
136 set_throttles(ops
=100000000, files
=6)
137 size_unit
= 1024 # small, numerous files
138 file_multiplier
= 200
140 raise NotImplementedError(throttle_type
)
142 # Pick up config changes
143 self
.fs
.mds_fail_restart()
144 self
.fs
.wait_for_daemons()
146 create_script
= dedent("""
149 mountpoint = "{mountpoint}"
151 size_unit = {size_unit}
152 file_multiplier = {file_multiplier}
153 os.mkdir(os.path.join(mountpoint, subdir))
154 for i in range(0, file_multiplier):
155 for size in range(0, {size_range}*size_unit, size_unit):
156 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
157 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
160 mountpoint
=self
.mount_a
.mountpoint
,
162 file_multiplier
=file_multiplier
,
163 size_range
=self
.throttle_workload_size_range
166 self
.mount_a
.run_python(create_script
)
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
171 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
172 self
.fs
.mds_asok(["flush", "journal"])
174 background_thread
= gevent
.spawn(background
)
176 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
177 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
189 stats
= self
.fs
.mds_asok(['perf', 'dump'])
190 mdc_stats
= stats
['mds_cache']
191 pq_stats
= stats
['purge_queue']
192 if elapsed
>= purge_timeout
:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
195 num_strays
= mdc_stats
['num_strays']
196 num_strays_purging
= pq_stats
['pq_executing']
197 num_purge_ops
= pq_stats
['pq_executing_ops']
198 files_high_water
= pq_stats
['pq_executing_high_water']
199 ops_high_water
= pq_stats
['pq_executing_ops_high_water']
201 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
, files_high_water
, ops_high_water
])
203 total_strays_created
= mdc_stats
['strays_created']
204 total_strays_purged
= pq_stats
['pq_executed']
206 if total_strays_purged
== total_inodes
:
207 log
.info("Complete purge in {0} seconds".format(elapsed
))
209 elif total_strays_purged
> total_inodes
:
210 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
212 if throttle_type
== self
.OPS_THROTTLE
:
213 # 11 is filer_max_purge_ops plus one for the backtrace:
214 # limit is allowed to be overshot by this much.
215 if num_purge_ops
> mds_max_purge_ops
+ 11:
216 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
217 num_purge_ops
, mds_max_purge_ops
219 elif throttle_type
== self
.FILES_THROTTLE
:
220 if num_strays_purging
> mds_max_purge_files
:
221 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
222 num_strays_purging
, mds_max_purge_files
225 raise NotImplementedError(throttle_type
)
227 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
228 num_strays_purging
, num_strays
,
229 total_strays_purged
, total_strays_created
234 background_thread
.join()
236 # Check that we got up to a respectable rate during the purge. This is totally
237 # racy, but should be safeish unless the cluster is pathologically slow, or
238 # insanely fast such that the deletions all pass before we have polled the
240 if throttle_type
== self
.OPS_THROTTLE
:
241 if ops_high_water
< mds_max_purge_ops
// 2:
242 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
243 ops_high_water
, mds_max_purge_ops
245 # The MDS may go over mds_max_purge_ops for some items, like a
246 # heavily fragmented directory. The throttle does not kick in
247 # until *after* we reach or exceed the limit. This is expected
248 # because we don't want to starve the PQ or never purge a
249 # particularly large file/directory.
250 self
.assertLessEqual(ops_high_water
, mds_max_purge_ops
+64)
251 elif throttle_type
== self
.FILES_THROTTLE
:
252 if files_high_water
< mds_max_purge_files
// 2:
253 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
254 files_high_water
, mds_max_purge_files
256 self
.assertLessEqual(files_high_water
, mds_max_purge_files
)
258 # Sanity check all MDC stray stats
259 stats
= self
.fs
.mds_asok(['perf', 'dump'])
260 mdc_stats
= stats
['mds_cache']
261 pq_stats
= stats
['purge_queue']
262 self
.assertEqual(mdc_stats
['num_strays'], 0)
263 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
264 self
.assertEqual(pq_stats
['pq_executing'], 0)
265 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
266 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
267 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
268 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
270 def get_mdc_stat(self
, name
, mds_id
=None):
271 return self
.get_stat("mds_cache", name
, mds_id
)
273 def get_stat(self
, subsys
, name
, mds_id
=None):
274 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
275 mds_id
=mds_id
)[subsys
][name
]
277 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
279 self
.wait_until_equal(
280 lambda: self
.get_stat(subsys
, counter
, mds_id
),
281 expect_val
=expect_val
, timeout
=timeout
,
282 reject_fn
=lambda x
: x
> expect_val
285 def test_open_inode(self
):
287 That the case of a dentry unlinked while a client holds an
288 inode open is handled correctly.
290 The inode should be moved into a stray dentry, while the original
291 dentry and directory should be purged.
293 The inode's data should be purged when the client eventually closes
296 mount_a_client_id
= self
.mount_a
.get_global_id()
298 # Write some bytes to a file
302 p
= self
.mount_a
.open_background("open_file")
303 self
.mount_a
.write_n_mb("open_file", size_mb
)
304 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
306 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
309 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
311 # Wait to see the stray count increment
312 self
.wait_until_equal(
313 lambda: self
.get_mdc_stat("num_strays"),
314 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
316 # See that while the stray count has incremented, none have passed
317 # on to the purge queue
318 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
319 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
321 # See that the client still holds 2 caps
322 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
324 # See that the data objects remain in the data pool
325 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
328 self
.mount_a
.kill_background(p
)
330 # Wait to see the client cap count decrement
331 self
.wait_until_equal(
332 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
333 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
335 # Wait to see the purge counter increment, stray count go to zero
336 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
337 self
.wait_until_equal(
338 lambda: self
.get_mdc_stat("num_strays"),
339 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
341 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
343 # See that the data objects no longer exist
344 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
346 self
.await_data_pool_empty()
348 def test_reintegration_limit(self
):
350 That the reintegration is not blocked by full directories.
354 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
355 time
.sleep(10) # for config to reach MDS; async create is fast!!
357 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
358 self
.mount_a
.run_shell_payload("""
360 for i in `seq 1 50`; do
368 self
.wait_until_equal(
369 lambda: self
.get_mdc_stat("num_strays"),
373 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
374 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
377 def test_hardlink_reintegration(self
):
379 That removal of primary dentry of hardlinked inode results
380 in reintegration of inode into the previously-remote dentry,
381 rather than lingering as a stray indefinitely.
383 # Write some bytes to file_a
385 self
.mount_a
.run_shell(["mkdir", "dir_1"])
386 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
387 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
389 # Create a hardlink named file_b
390 self
.mount_a
.run_shell(["mkdir", "dir_2"])
391 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
392 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
395 self
.fs
.mds_asok(['flush', 'journal'])
397 # See that backtrace for the file points to the file_a path
398 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
399 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
401 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
402 self
.mount_a
.umount_wait()
403 self
.fs
.mds_asok(['flush', 'journal'])
404 self
.fs
.mds_fail_restart()
405 self
.fs
.wait_for_daemons()
406 self
.mount_a
.mount_wait()
409 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
411 # See that a stray was created
412 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
413 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
415 # Wait, see that data objects are still present (i.e. that the
416 # stray did not advance to purging given time)
418 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
419 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
421 # See that before reintegration, the inode's backtrace points to a stray dir
422 self
.fs
.mds_asok(['flush', 'journal'])
423 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
425 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
427 # Do a metadata operation on the remaining link (mv is heavy handed, but
428 # others like touch may be satisfied from caps without poking MDS)
429 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
431 # Stray reintegration should happen as a result of the eval_remote call
432 # on responding to a client request.
433 self
.wait_until_equal(
434 lambda: self
.get_mdc_stat("num_strays"),
439 # See the reintegration counter increment
440 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
441 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
442 last_reintegrated
= curr_reintegrated
445 self
.fs
.mds_asok(['flush', 'journal'])
447 # See that the backtrace for the file points to the remaining link's path
448 post_reint_bt
= self
.fs
.read_backtrace(ino
)
449 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
451 # mds should reintegrates stray when unlink finishes
452 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
453 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
455 # Stray reintegration should happen as a result of the notify_stray call
456 # on completion of unlink
457 self
.wait_until_equal(
458 lambda: self
.get_mdc_stat("num_strays"),
463 # See the reintegration counter increment
464 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
465 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
466 last_reintegrated
= curr_reintegrated
469 self
.fs
.mds_asok(['flush', 'journal'])
471 # See that the backtrace for the file points to the newest link's path
472 post_reint_bt
= self
.fs
.read_backtrace(ino
)
473 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
475 # Now really delete it
476 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
477 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
478 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
480 self
.assert_purge_idle()
481 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
483 # We caused the inode to go stray 3 times
484 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
485 # We purged it at the last
486 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
488 def test_reintegration_via_scrub(self
):
490 That reintegration is triggered via recursive scrub.
493 self
.mount_a
.run_shell_payload("""
495 for i in `seq 1 50`; do
502 self
.mount_a
.remount() # drop caps/cache
503 self
.fs
.rank_tell(["flush", "journal"])
505 self
.fs
.wait_for_daemons()
507 # only / in cache, reintegration cannot happen
508 self
.wait_until_equal(
509 lambda: len(self
.fs
.rank_tell(["dump", "tree", "/"])),
514 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
515 self
.mount_a
.run_shell_payload("""
519 self
.wait_until_equal(
520 lambda: len(self
.fs
.rank_tell(["dump", "tree", "/"])),
524 self
.assertEqual(self
.get_mdc_stat("num_strays"), 50)
525 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
526 self
.assertEqual(last_reintegrated
, curr_reintegrated
)
528 self
.fs
.rank_tell(["scrub", "start", "/", "recursive,force"])
530 self
.wait_until_equal(
531 lambda: self
.get_mdc_stat("num_strays"),
535 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
536 # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths
537 self
.assertGreaterEqual(curr_reintegrated
, last_reintegrated
+50)
539 def test_mv_hardlink_cleanup(self
):
541 That when doing a rename from A to B, and B has hardlinks,
542 then we make a stray for B which is then reintegrated
543 into one of his hardlinks.
545 # Create file_a, file_b, and a hardlink to file_b
547 self
.mount_a
.write_n_mb("file_a", size_mb
)
548 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
550 self
.mount_a
.write_n_mb("file_b", size_mb
)
551 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
553 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
554 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
557 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
559 # Stray reintegration should happen as a result of the notify_stray call on
560 # completion of rename
561 self
.wait_until_equal(
562 lambda: self
.get_mdc_stat("num_strays"),
567 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
568 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
570 # No data objects should have been deleted, as both files still have linkage.
571 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
572 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
574 self
.fs
.mds_asok(['flush', 'journal'])
576 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
577 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
579 def _setup_two_ranks(self
):
581 self
.fs
.set_max_mds(2)
583 # See that we have two active MDSs
584 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
585 reject_fn
=lambda v
: v
> 2 or v
< 1)
587 active_mds_names
= self
.fs
.get_active_names()
588 rank_0_id
= active_mds_names
[0]
589 rank_1_id
= active_mds_names
[1]
590 log
.info("Ranks 0 and 1 are {0} and {1}".format(
591 rank_0_id
, rank_1_id
))
593 # Get rid of other MDS daemons so that it's easier to know which
594 # daemons to expect in which ranks after restarts
595 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
596 self
.mds_cluster
.mds_stop(unneeded_mds
)
597 self
.mds_cluster
.mds_fail(unneeded_mds
)
599 return rank_0_id
, rank_1_id
601 def _force_migrate(self
, path
, rank
=1):
603 :param to_id: MDS id to move it to
604 :param path: Filesystem path (string) to move
607 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank
), path
])
609 self
._wait
_subtrees
([(rpath
, rank
)], rank
=rank
, path
=rpath
)
611 def _is_stopped(self
, rank
):
612 mds_map
= self
.fs
.get_mds_map()
613 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
615 def test_purge_on_shutdown(self
):
617 That when an MDS rank is shut down, its purge queue is
618 drained in the process.
620 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
622 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
623 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
624 self
.fs
.wait_for_daemons()
628 self
.mount_a
.create_n_files("delete_me/file", file_count
)
630 self
._force
_migrate
("delete_me")
632 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
633 self
.mount_a
.umount_wait()
635 # See all the strays go into purge queue
636 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
637 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
638 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
640 # See nothing get purged from the purge queue (yet)
642 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
645 self
.fs
.set_max_mds(1)
647 # It shouldn't proceed past stopping because its still not allowed
650 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
651 self
.assertFalse(self
._is
_stopped
(1))
653 # Permit the daemon to start purging again
654 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
656 "--mds_max_purge_files 100")
658 # It should now proceed through shutdown
659 self
.fs
.wait_for_daemons(timeout
=120)
661 # ...and in the process purge all that data
662 self
.await_data_pool_empty()
664 def test_migration_on_shutdown(self
):
666 That when an MDS rank is shut down, any non-purgeable strays
667 get migrated to another rank.
670 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
672 # Create a non-purgeable stray in a ~mds1 stray directory
673 # by doing a hard link and deleting the original file
674 self
.mount_a
.run_shell_payload("""
677 ln dir_1/original dir_2/linkto
680 self
._force
_migrate
("dir_1")
681 self
._force
_migrate
("dir_2", rank
=0)
683 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
684 self
.mount_a
.umount_wait()
685 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
686 self
.fs
.mds_asok(['cache', 'drop'], rank_1_id
)
688 self
.mount_a
.mount_wait()
689 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
690 self
.mount_a
.umount_wait()
692 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
696 self
.fs
.set_max_mds(1)
697 self
.fs
.wait_for_daemons(timeout
=120)
699 # See that the stray counter on rank 0 has incremented
700 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
702 def test_migrate_unlinked_dir(self
):
704 Reproduce https://tracker.ceph.com/issues/53597
706 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
708 self
.mount_a
.run_shell_payload("""
710 touch pin/placeholder
713 self
._force
_migrate
("pin")
715 # Hold the dir open so it cannot be purged
716 p
= self
.mount_a
.open_dir_background("pin/to-be-unlinked")
719 self
.mount_a
.run_shell(["rmdir", "pin/to-be-unlinked"])
721 # Wait to see the stray count increment
722 self
.wait_until_equal(
723 lambda: self
.get_mdc_stat("num_strays", mds_id
=rank_1_id
),
724 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
726 self
.assertEqual(self
.get_mdc_stat("strays_created", mds_id
=rank_1_id
), 1)
727 self
.assertEqual(self
.get_mdc_stat("strays_enqueued", mds_id
=rank_1_id
), 0)
729 # Test loading unlinked dir into cache
730 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
731 self
.fs
.mds_asok(['cache', 'drop'], rank_1_id
)
734 self
.fs
.set_max_mds(1)
735 self
.fs
.wait_for_daemons(timeout
=120)
736 # Now the stray should be migrated to rank 0
737 # self.assertEqual(self.get_mdc_stat("strays_created", mds_id=rank_0_id), 1)
738 # https://github.com/ceph/ceph/pull/44335#issuecomment-1125940158
740 self
.mount_a
.kill_background(p
)
742 def assert_backtrace(self
, ino
, expected_path
):
744 Assert that the backtrace in the data pool for an inode matches
745 an expected /foo/bar path.
747 expected_elements
= expected_path
.strip("/").split("/")
748 bt
= self
.fs
.read_backtrace(ino
)
749 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
750 self
.assertListEqual(expected_elements
, actual_elements
)
752 def get_backtrace_path(self
, ino
):
753 bt
= self
.fs
.read_backtrace(ino
)
754 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
755 return "/".join(elements
)
757 def assert_purge_idle(self
):
759 Assert that the MDS perf counters indicate no strays exist and
760 no ongoing purge activity. Sanity check for when PurgeQueue should
763 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
764 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
765 self
.assertEqual(mdc_stats
["num_strays"], 0)
766 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
767 self
.assertEqual(pq_stats
["pq_executing"], 0)
768 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
770 def test_mv_cleanup(self
):
772 That when doing a rename from A to B, and B has no hardlinks,
773 then we make a stray for B and purge him.
775 # Create file_a and file_b, write some to both
777 self
.mount_a
.write_n_mb("file_a", size_mb
)
778 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
779 self
.mount_a
.write_n_mb("file_b", size_mb
)
780 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
782 self
.fs
.mds_asok(['flush', 'journal'])
783 self
.assert_backtrace(file_a_ino
, "file_a")
784 self
.assert_backtrace(file_b_ino
, "file_b")
787 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
789 # See that stray counter increments
790 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
791 # Wait for purge counter to increment
792 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
793 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
795 self
.assert_purge_idle()
797 # file_b should have been purged
798 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
800 # Backtrace should have updated from file_a to file_b
801 self
.fs
.mds_asok(['flush', 'journal'])
802 self
.assert_backtrace(file_a_ino
, "file_b")
804 # file_a's data should still exist
805 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
807 def _pool_df(self
, pool_name
):
813 "max_avail": 19630292406,
817 :param pool_name: Which pool (must exist)
819 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
820 for p
in json
.loads(out
)['pools']:
821 if p
['name'] == pool_name
:
824 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
826 def await_data_pool_empty(self
):
827 self
.wait_until_true(
828 lambda: self
._pool
_df
(
829 self
.fs
.get_data_pool_name()
833 def test_snapshot_remove(self
):
835 That removal of a snapshot that references a now-unlinked file results
836 in purging on the stray for the file.
839 self
.fs
.set_allow_new_snaps(True)
841 # Create a dir with a file in it
843 self
.mount_a
.run_shell(["mkdir", "snapdir"])
844 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
845 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
846 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
849 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
851 # Cause the head revision to deviate from the snapshot
852 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
854 # Flush the journal so that backtraces, dirfrag objects will actually be written
855 self
.fs
.mds_asok(["flush", "journal"])
858 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
859 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
861 # Unmount the client because when I come back to check the data is still
862 # in the file I don't want to just see what's in the page cache.
863 self
.mount_a
.umount_wait()
865 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
867 # FIXME: at this stage we see a purge and the stray count drops to
868 # zero, but there's actually still a stray, so at the very
869 # least the StrayManager stats code is slightly off
871 self
.mount_a
.mount_wait()
873 # See that the data from the snapshotted revision of the file is still present
875 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
877 # Remove the snapshot
878 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
880 # Purging file_a doesn't happen until after we've flushed the journal, because
881 # it is referenced by the snapshotted subdir, and the snapshot isn't really
882 # gone until the journal references to it are gone
883 self
.fs
.mds_asok(["flush", "journal"])
885 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
886 # See also: http://tracker.ceph.com/issues/20072
887 self
.wait_until_true(
888 lambda: self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024),
892 # See that a purge happens now
893 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
894 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
896 self
.await_data_pool_empty()
898 def test_fancy_layout(self
):
900 purge stray file with fancy layout
903 file_name
= "fancy_layout_file"
904 self
.mount_a
.run_shell(["touch", file_name
])
906 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
907 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
909 # 35MB requires 7 objects
911 self
.mount_a
.write_n_mb(file_name
, size_mb
)
913 self
.mount_a
.run_shell(["rm", "-f", file_name
])
914 self
.fs
.mds_asok(["flush", "journal"])
916 # can't use self.fs.data_objects_absent here, it does not support fancy layout
917 self
.await_data_pool_empty()
919 def test_dirfrag_limit(self
):
921 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
925 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
926 time
.sleep(10) # for config to reach MDS; async create is fast!!
929 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
+1, finaldirsync
=True)
930 except CommandFailedError
:
933 self
.fail("fragment size exceeded")
936 def test_dirfrag_limit_fragmented(self
):
938 That fragmentation (forced) will allow more entries to be created.
942 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
943 self
.config_set('mds', 'mds_bal_merge_size', 1) # disable merging
944 time
.sleep(10) # for config to reach MDS; async create is fast!!
946 # Test that we can go beyond the limit if we fragment the directory
947 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
, finaldirsync
=True)
948 self
.mount_a
.umount_wait() # release client caps
950 # Ensure that subdir is fragmented
951 self
.fs
.rank_asok(["dirfrag", "split", "/subdir", "0/0", "1"])
952 self
.fs
.rank_asok(["flush", "journal"])
954 # Create 50% more files than the current fragment limit
955 self
.mount_a
.mount_wait()
956 self
.mount_a
.create_n_files("subdir/file", (LOW_LIMIT
*3)//2, finaldirsync
=True)
958 def test_dirfrag_limit_strays(self
):
960 That unlinking fails when the stray directory fragment becomes too
961 large and that unlinking may continue once those strays are purged.
965 # N.B. this test is inherently racy because stray removal may be faster
966 # than slow(er) file creation.
967 self
.config_set('mds', 'mds_bal_fragment_size_max', LOW_LIMIT
)
968 time
.sleep(10) # for config to reach MDS; async create is fast!!
970 # Now test the stray directory size is limited and recovers
971 strays_before
= self
.get_mdc_stat("strays_created")
973 # 10 stray directories: expect collisions
974 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
*10, finaldirsync
=True, unlink
=True)
975 except CommandFailedError
:
978 self
.fail("fragment size exceeded")
979 strays_after
= self
.get_mdc_stat("strays_created")
980 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
982 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
983 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
985 # verify new files can be created and unlinked
986 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
, dirsync
=True, unlink
=True)
988 def test_purge_queue_upgrade(self
):
990 That when starting on a system with no purge queue in the metadata
991 pool, we silently create one.
995 self
.mds_cluster
.mds_stop()
996 self
.mds_cluster
.mds_fail()
997 self
.fs
.radosm(["rm", "500.00000000"])
998 self
.mds_cluster
.mds_restart()
999 self
.fs
.wait_for_daemons()
1001 def test_replicated_delete_speed(self
):
1003 That deletions of replicated metadata are not pathologically slow
1005 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
1007 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
1008 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
1009 self
.fs
.wait_for_daemons()
1013 self
.mount_a
.create_n_files("delete_me/file", file_count
)
1015 self
._force
_migrate
("delete_me")
1017 begin
= datetime
.datetime
.now()
1018 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
1019 end
= datetime
.datetime
.now()
1021 # What we're really checking here is that we are completing client
1022 # operations immediately rather than delaying until the next tick.
1023 tick_period
= float(self
.fs
.get_config("mds_tick_interval",
1024 service_type
="mds"))
1026 duration
= (end
- begin
).total_seconds()
1027 self
.assertLess(duration
, (file_count
* tick_period
) * 0.25)