4 from textwrap
import dedent
8 from teuthology
.exceptions
import CommandFailedError
9 from teuthology
.orchestra
.run
import Raw
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
12 log
= logging
.getLogger(__name__
)
15 class TestStrays(CephFSTestCase
):
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range
= 16
25 def test_ops_throttle(self
):
26 self
._test
_throttling
(self
.OPS_THROTTLE
)
29 def test_files_throttle(self
):
30 self
._test
_throttling
(self
.FILES_THROTTLE
)
32 def test_dir_deletion(self
):
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
40 create_script
= dedent("""
43 mountpoint = "{mountpoint}"
46 file_count = {file_count}
47 os.mkdir(os.path.join(mountpoint, subdir))
48 for i in range(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
53 mountpoint
=self
.mount_a
.mountpoint
,
58 self
.mount_a
.run_python(create_script
)
60 # That the dirfrag object is created
61 self
.fs
.mds_asok(["flush", "journal"])
62 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
63 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
66 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
67 self
.fs
.mds_asok(["flush", "journal"])
69 # That all the removed files get created as strays
70 strays
= self
.get_mdc_stat("strays_created")
71 self
.assertEqual(strays
, file_count
+ 1)
73 # That the strays all get enqueued for purge
74 self
.wait_until_equal(
75 lambda: self
.get_mdc_stat("strays_enqueued"),
81 # That all the purge operations execute
82 self
.wait_until_equal(
83 lambda: self
.get_stat("purge_queue", "pq_executed"),
88 # That finally, the directory metadata object is gone
89 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
91 # That finally, the data objects are all gone
92 self
.await_data_pool_empty()
94 def _test_throttling(self
, throttle_type
):
97 return self
._do
_test
_throttling
(throttle_type
)
99 for l
in self
.data_log
:
100 log
.info(",".join([l_
.__str
__() for l_
in l
]))
103 def _do_test_throttling(self
, throttle_type
):
105 That the mds_max_purge_ops setting is respected
108 def set_throttles(files
, ops
):
110 Helper for updating ops/files limits, and calculating effective
111 ops_per_pg setting to give the same ops limit.
113 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
114 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
116 pgs
= self
.fs
.mon_manager
.get_pool_int_property(
117 self
.fs
.get_data_pool_name(),
120 ops_per_pg
= float(ops
) / pgs
121 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
123 # Test conditions depend on what we're going to be exercising.
124 # * Lift the threshold on whatever throttle we are *not* testing, so
125 # that the throttle of interest is the one that will be the bottleneck
126 # * Create either many small files (test file count throttling) or fewer
127 # large files (test op throttling)
128 if throttle_type
== self
.OPS_THROTTLE
:
129 set_throttles(files
=100000000, ops
=16)
130 size_unit
= 1024 * 1024 # big files, generate lots of ops
131 file_multiplier
= 100
132 elif throttle_type
== self
.FILES_THROTTLE
:
133 # The default value of file limit is pretty permissive, so to avoid
134 # the test running too fast, create lots of files and set the limit
136 set_throttles(ops
=100000000, files
=6)
137 size_unit
= 1024 # small, numerous files
138 file_multiplier
= 200
140 raise NotImplementedError(throttle_type
)
142 # Pick up config changes
143 self
.fs
.mds_fail_restart()
144 self
.fs
.wait_for_daemons()
146 create_script
= dedent("""
149 mountpoint = "{mountpoint}"
151 size_unit = {size_unit}
152 file_multiplier = {file_multiplier}
153 os.mkdir(os.path.join(mountpoint, subdir))
154 for i in range(0, file_multiplier):
155 for size in range(0, {size_range}*size_unit, size_unit):
156 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
157 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
160 mountpoint
=self
.mount_a
.mountpoint
,
162 file_multiplier
=file_multiplier
,
163 size_range
=self
.throttle_workload_size_range
166 self
.mount_a
.run_python(create_script
)
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
171 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
172 self
.fs
.mds_asok(["flush", "journal"])
174 background_thread
= gevent
.spawn(background
)
176 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
177 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
189 stats
= self
.fs
.mds_asok(['perf', 'dump'])
190 mdc_stats
= stats
['mds_cache']
191 pq_stats
= stats
['purge_queue']
192 if elapsed
>= purge_timeout
:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
195 num_strays
= mdc_stats
['num_strays']
196 num_strays_purging
= pq_stats
['pq_executing']
197 num_purge_ops
= pq_stats
['pq_executing_ops']
198 files_high_water
= pq_stats
['pq_executing_high_water']
199 ops_high_water
= pq_stats
['pq_executing_ops_high_water']
201 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
, files_high_water
, ops_high_water
])
203 total_strays_created
= mdc_stats
['strays_created']
204 total_strays_purged
= pq_stats
['pq_executed']
206 if total_strays_purged
== total_inodes
:
207 log
.info("Complete purge in {0} seconds".format(elapsed
))
209 elif total_strays_purged
> total_inodes
:
210 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
212 if throttle_type
== self
.OPS_THROTTLE
:
213 # 11 is filer_max_purge_ops plus one for the backtrace:
214 # limit is allowed to be overshot by this much.
215 if num_purge_ops
> mds_max_purge_ops
+ 11:
216 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
217 num_purge_ops
, mds_max_purge_ops
219 elif throttle_type
== self
.FILES_THROTTLE
:
220 if num_strays_purging
> mds_max_purge_files
:
221 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
222 num_strays_purging
, mds_max_purge_files
225 raise NotImplementedError(throttle_type
)
227 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
228 num_strays_purging
, num_strays
,
229 total_strays_purged
, total_strays_created
234 background_thread
.join()
236 # Check that we got up to a respectable rate during the purge. This is totally
237 # racy, but should be safeish unless the cluster is pathologically slow, or
238 # insanely fast such that the deletions all pass before we have polled the
240 if throttle_type
== self
.OPS_THROTTLE
:
241 if ops_high_water
< mds_max_purge_ops
// 2:
242 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
243 ops_high_water
, mds_max_purge_ops
245 # The MDS may go over mds_max_purge_ops for some items, like a
246 # heavily fragmented directory. The throttle does not kick in
247 # until *after* we reach or exceed the limit. This is expected
248 # because we don't want to starve the PQ or never purge a
249 # particularly large file/directory.
250 self
.assertLessEqual(ops_high_water
, mds_max_purge_ops
+64)
251 elif throttle_type
== self
.FILES_THROTTLE
:
252 if files_high_water
< mds_max_purge_files
// 2:
253 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
254 files_high_water
, mds_max_purge_files
256 self
.assertLessEqual(files_high_water
, mds_max_purge_files
)
258 # Sanity check all MDC stray stats
259 stats
= self
.fs
.mds_asok(['perf', 'dump'])
260 mdc_stats
= stats
['mds_cache']
261 pq_stats
= stats
['purge_queue']
262 self
.assertEqual(mdc_stats
['num_strays'], 0)
263 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
264 self
.assertEqual(pq_stats
['pq_executing'], 0)
265 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
266 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
267 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
268 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
270 def get_mdc_stat(self
, name
, mds_id
=None):
271 return self
.get_stat("mds_cache", name
, mds_id
)
273 def get_stat(self
, subsys
, name
, mds_id
=None):
274 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
275 mds_id
=mds_id
)[subsys
][name
]
277 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
279 self
.wait_until_equal(
280 lambda: self
.get_stat(subsys
, counter
, mds_id
),
281 expect_val
=expect_val
, timeout
=timeout
,
282 reject_fn
=lambda x
: x
> expect_val
285 def test_open_inode(self
):
287 That the case of a dentry unlinked while a client holds an
288 inode open is handled correctly.
290 The inode should be moved into a stray dentry, while the original
291 dentry and directory should be purged.
293 The inode's data should be purged when the client eventually closes
296 mount_a_client_id
= self
.mount_a
.get_global_id()
298 # Write some bytes to a file
302 p
= self
.mount_a
.open_background("open_file")
303 self
.mount_a
.write_n_mb("open_file", size_mb
)
304 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
306 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
309 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
311 # Wait to see the stray count increment
312 self
.wait_until_equal(
313 lambda: self
.get_mdc_stat("num_strays"),
314 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
316 # See that while the stray count has incremented, none have passed
317 # on to the purge queue
318 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
319 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
321 # See that the client still holds 2 caps
322 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
324 # See that the data objects remain in the data pool
325 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
328 self
.mount_a
.kill_background(p
)
330 # Wait to see the client cap count decrement
331 self
.wait_until_equal(
332 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
333 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
335 # Wait to see the purge counter increment, stray count go to zero
336 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
337 self
.wait_until_equal(
338 lambda: self
.get_mdc_stat("num_strays"),
339 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
341 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
343 # See that the data objects no longer exist
344 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
346 self
.await_data_pool_empty()
348 def test_reintegration_limit(self
):
350 That the reintegration is not blocked by full directories.
354 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
355 time
.sleep(10) # for config to reach MDS; async create is fast!!
357 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
358 self
.mount_a
.run_shell_payload("""
360 for i in `seq 1 50`; do
368 self
.wait_until_equal(
369 lambda: self
.get_mdc_stat("num_strays"),
373 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
374 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
377 def test_hardlink_reintegration(self
):
379 That removal of primary dentry of hardlinked inode results
380 in reintegration of inode into the previously-remote dentry,
381 rather than lingering as a stray indefinitely.
383 # Write some bytes to file_a
385 self
.mount_a
.run_shell(["mkdir", "dir_1"])
386 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
387 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
389 # Create a hardlink named file_b
390 self
.mount_a
.run_shell(["mkdir", "dir_2"])
391 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
392 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
395 self
.fs
.mds_asok(['flush', 'journal'])
397 # See that backtrace for the file points to the file_a path
398 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
399 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
401 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
402 self
.mount_a
.umount_wait()
403 self
.fs
.mds_asok(['flush', 'journal'])
404 self
.fs
.mds_fail_restart()
405 self
.fs
.wait_for_daemons()
406 self
.mount_a
.mount_wait()
409 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
411 # See that a stray was created
412 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
413 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
415 # Wait, see that data objects are still present (i.e. that the
416 # stray did not advance to purging given time)
418 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
419 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
421 # See that before reintegration, the inode's backtrace points to a stray dir
422 self
.fs
.mds_asok(['flush', 'journal'])
423 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
425 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
427 # Do a metadata operation on the remaining link (mv is heavy handed, but
428 # others like touch may be satisfied from caps without poking MDS)
429 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
431 # Stray reintegration should happen as a result of the eval_remote call
432 # on responding to a client request.
433 self
.wait_until_equal(
434 lambda: self
.get_mdc_stat("num_strays"),
439 # See the reintegration counter increment
440 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
441 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
442 last_reintegrated
= curr_reintegrated
445 self
.fs
.mds_asok(['flush', 'journal'])
447 # See that the backtrace for the file points to the remaining link's path
448 post_reint_bt
= self
.fs
.read_backtrace(ino
)
449 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
451 # mds should reintegrates stray when unlink finishes
452 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
453 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
455 # Stray reintegration should happen as a result of the notify_stray call
456 # on completion of unlink
457 self
.wait_until_equal(
458 lambda: self
.get_mdc_stat("num_strays"),
463 # See the reintegration counter increment
464 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
465 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
466 last_reintegrated
= curr_reintegrated
469 self
.fs
.mds_asok(['flush', 'journal'])
471 # See that the backtrace for the file points to the newest link's path
472 post_reint_bt
= self
.fs
.read_backtrace(ino
)
473 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
475 # Now really delete it
476 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
477 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
478 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
480 self
.assert_purge_idle()
481 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
483 # We caused the inode to go stray 3 times
484 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
485 # We purged it at the last
486 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
488 def test_reintegration_via_scrub(self
):
490 That reintegration is triggered via recursive scrub.
493 self
.mount_a
.run_shell_payload("""
495 for i in `seq 1 50`; do
502 self
.mount_a
.remount() # drop caps/cache
503 self
.fs
.rank_tell(["flush", "journal"])
505 self
.fs
.wait_for_daemons()
507 # only / in cache, reintegration cannot happen
508 self
.wait_until_equal(
509 lambda: len(self
.fs
.rank_tell(["dump", "tree", "/"])),
514 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
515 self
.mount_a
.run_shell_payload("""
519 self
.wait_until_equal(
520 lambda: len(self
.fs
.rank_tell(["dump", "tree", "/"])),
524 self
.assertEqual(self
.get_mdc_stat("num_strays"), 50)
525 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
526 self
.assertEqual(last_reintegrated
, curr_reintegrated
)
528 self
.fs
.rank_tell(["scrub", "start", "/", "recursive,force"])
530 self
.wait_until_equal(
531 lambda: self
.get_mdc_stat("num_strays"),
535 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
536 # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths
537 self
.assertGreaterEqual(curr_reintegrated
, last_reintegrated
+50)
539 def test_mv_hardlink_cleanup(self
):
541 That when doing a rename from A to B, and B has hardlinks,
542 then we make a stray for B which is then reintegrated
543 into one of his hardlinks.
545 # Create file_a, file_b, and a hardlink to file_b
547 self
.mount_a
.write_n_mb("file_a", size_mb
)
548 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
550 self
.mount_a
.write_n_mb("file_b", size_mb
)
551 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
553 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
554 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
557 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
559 # Stray reintegration should happen as a result of the notify_stray call on
560 # completion of rename
561 self
.wait_until_equal(
562 lambda: self
.get_mdc_stat("num_strays"),
567 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
568 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
570 # No data objects should have been deleted, as both files still have linkage.
571 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
572 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
574 self
.fs
.mds_asok(['flush', 'journal'])
576 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
577 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
579 def _setup_two_ranks(self
):
581 self
.fs
.set_max_mds(2)
583 # See that we have two active MDSs
584 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
585 reject_fn
=lambda v
: v
> 2 or v
< 1)
587 active_mds_names
= self
.fs
.get_active_names()
588 rank_0_id
= active_mds_names
[0]
589 rank_1_id
= active_mds_names
[1]
590 log
.info("Ranks 0 and 1 are {0} and {1}".format(
591 rank_0_id
, rank_1_id
))
593 # Get rid of other MDS daemons so that it's easier to know which
594 # daemons to expect in which ranks after restarts
595 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
596 self
.mds_cluster
.mds_stop(unneeded_mds
)
597 self
.mds_cluster
.mds_fail(unneeded_mds
)
599 return rank_0_id
, rank_1_id
601 def _force_migrate(self
, path
, rank
=1):
603 :param to_id: MDS id to move it to
604 :param path: Filesystem path (string) to move
605 :param watch_ino: Inode number to look for at destination to confirm move
608 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank
), path
])
610 self
._wait
_subtrees
([(rpath
, rank
)], rank
=rank
, path
=rpath
)
612 def _is_stopped(self
, rank
):
613 mds_map
= self
.fs
.get_mds_map()
614 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
616 def test_purge_on_shutdown(self
):
618 That when an MDS rank is shut down, its purge queue is
619 drained in the process.
621 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
623 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
624 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
625 self
.fs
.wait_for_daemons()
629 self
.mount_a
.create_n_files("delete_me/file", file_count
)
631 self
._force
_migrate
("delete_me")
633 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
634 self
.mount_a
.umount_wait()
636 # See all the strays go into purge queue
637 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
638 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
639 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
641 # See nothing get purged from the purge queue (yet)
643 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
646 self
.fs
.set_max_mds(1)
648 # It shouldn't proceed past stopping because its still not allowed
651 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
652 self
.assertFalse(self
._is
_stopped
(1))
654 # Permit the daemon to start purging again
655 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
657 "--mds_max_purge_files 100")
659 # It should now proceed through shutdown
660 self
.fs
.wait_for_daemons(timeout
=120)
662 # ...and in the process purge all that data
663 self
.await_data_pool_empty()
665 def test_migration_on_shutdown(self
):
667 That when an MDS rank is shut down, any non-purgeable strays
668 get migrated to another rank.
671 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
673 # Create a non-purgeable stray in a ~mds1 stray directory
674 # by doing a hard link and deleting the original file
675 self
.mount_a
.run_shell_payload("""
678 ln dir_1/original dir_2/linkto
681 self
._force
_migrate
("dir_1")
682 self
._force
_migrate
("dir_2", rank
=0)
684 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
685 self
.mount_a
.umount_wait()
686 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
687 self
.fs
.mds_asok(['cache', 'drop'], rank_1_id
)
689 self
.mount_a
.mount_wait()
690 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
691 self
.mount_a
.umount_wait()
693 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
697 self
.fs
.set_max_mds(1)
698 self
.fs
.wait_for_daemons(timeout
=120)
700 # See that the stray counter on rank 0 has incremented
701 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
703 def assert_backtrace(self
, ino
, expected_path
):
705 Assert that the backtrace in the data pool for an inode matches
706 an expected /foo/bar path.
708 expected_elements
= expected_path
.strip("/").split("/")
709 bt
= self
.fs
.read_backtrace(ino
)
710 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
711 self
.assertListEqual(expected_elements
, actual_elements
)
713 def get_backtrace_path(self
, ino
):
714 bt
= self
.fs
.read_backtrace(ino
)
715 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
716 return "/".join(elements
)
718 def assert_purge_idle(self
):
720 Assert that the MDS perf counters indicate no strays exist and
721 no ongoing purge activity. Sanity check for when PurgeQueue should
724 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
725 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
726 self
.assertEqual(mdc_stats
["num_strays"], 0)
727 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
728 self
.assertEqual(pq_stats
["pq_executing"], 0)
729 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
731 def test_mv_cleanup(self
):
733 That when doing a rename from A to B, and B has no hardlinks,
734 then we make a stray for B and purge him.
736 # Create file_a and file_b, write some to both
738 self
.mount_a
.write_n_mb("file_a", size_mb
)
739 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
740 self
.mount_a
.write_n_mb("file_b", size_mb
)
741 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
743 self
.fs
.mds_asok(['flush', 'journal'])
744 self
.assert_backtrace(file_a_ino
, "file_a")
745 self
.assert_backtrace(file_b_ino
, "file_b")
748 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
750 # See that stray counter increments
751 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
752 # Wait for purge counter to increment
753 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
754 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
756 self
.assert_purge_idle()
758 # file_b should have been purged
759 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
761 # Backtrace should have updated from file_a to file_b
762 self
.fs
.mds_asok(['flush', 'journal'])
763 self
.assert_backtrace(file_a_ino
, "file_b")
765 # file_a's data should still exist
766 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
768 def _pool_df(self
, pool_name
):
774 "max_avail": 19630292406,
778 :param pool_name: Which pool (must exist)
780 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
781 for p
in json
.loads(out
)['pools']:
782 if p
['name'] == pool_name
:
785 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
787 def await_data_pool_empty(self
):
788 self
.wait_until_true(
789 lambda: self
._pool
_df
(
790 self
.fs
.get_data_pool_name()
794 def test_snapshot_remove(self
):
796 That removal of a snapshot that references a now-unlinked file results
797 in purging on the stray for the file.
800 self
.fs
.set_allow_new_snaps(True)
802 # Create a dir with a file in it
804 self
.mount_a
.run_shell(["mkdir", "snapdir"])
805 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
806 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
807 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
810 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
812 # Cause the head revision to deviate from the snapshot
813 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
815 # Flush the journal so that backtraces, dirfrag objects will actually be written
816 self
.fs
.mds_asok(["flush", "journal"])
819 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
820 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
822 # Unmount the client because when I come back to check the data is still
823 # in the file I don't want to just see what's in the page cache.
824 self
.mount_a
.umount_wait()
826 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
828 # FIXME: at this stage we see a purge and the stray count drops to
829 # zero, but there's actually still a stray, so at the very
830 # least the StrayManager stats code is slightly off
832 self
.mount_a
.mount_wait()
834 # See that the data from the snapshotted revision of the file is still present
836 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
838 # Remove the snapshot
839 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
841 # Purging file_a doesn't happen until after we've flushed the journal, because
842 # it is referenced by the snapshotted subdir, and the snapshot isn't really
843 # gone until the journal references to it are gone
844 self
.fs
.mds_asok(["flush", "journal"])
846 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
847 # See also: http://tracker.ceph.com/issues/20072
848 self
.wait_until_true(
849 lambda: self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024),
853 # See that a purge happens now
854 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
855 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
857 self
.await_data_pool_empty()
859 def test_fancy_layout(self
):
861 purge stray file with fancy layout
864 file_name
= "fancy_layout_file"
865 self
.mount_a
.run_shell(["touch", file_name
])
867 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
868 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
870 # 35MB requires 7 objects
872 self
.mount_a
.write_n_mb(file_name
, size_mb
)
874 self
.mount_a
.run_shell(["rm", "-f", file_name
])
875 self
.fs
.mds_asok(["flush", "journal"])
877 # can't use self.fs.data_objects_absent here, it does not support fancy layout
878 self
.await_data_pool_empty()
880 def test_dirfrag_limit(self
):
882 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
886 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
887 time
.sleep(10) # for config to reach MDS; async create is fast!!
890 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
+1, finaldirsync
=True)
891 except CommandFailedError
:
894 self
.fail("fragment size exceeded")
897 def test_dirfrag_limit_fragmented(self
):
899 That fragmentation (forced) will allow more entries to be created.
903 self
.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT
))
904 self
.config_set('mds', 'mds_bal_merge_size', 1) # disable merging
905 time
.sleep(10) # for config to reach MDS; async create is fast!!
907 # Test that we can go beyond the limit if we fragment the directory
908 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
, finaldirsync
=True)
909 self
.mount_a
.umount_wait() # release client caps
911 # Ensure that subdir is fragmented
912 self
.fs
.rank_asok(["dirfrag", "split", "/subdir", "0/0", "1"])
913 self
.fs
.rank_asok(["flush", "journal"])
915 # Create 50% more files than the current fragment limit
916 self
.mount_a
.mount_wait()
917 self
.mount_a
.create_n_files("subdir/file", (LOW_LIMIT
*3)//2, finaldirsync
=True)
919 def test_dirfrag_limit_strays(self
):
921 That unlinking fails when the stray directory fragment becomes too
922 large and that unlinking may continue once those strays are purged.
926 # N.B. this test is inherently racy because stray removal may be faster
927 # than slow(er) file creation.
928 self
.config_set('mds', 'mds_bal_fragment_size_max', LOW_LIMIT
)
929 time
.sleep(10) # for config to reach MDS; async create is fast!!
931 # Now test the stray directory size is limited and recovers
932 strays_before
= self
.get_mdc_stat("strays_created")
934 # 10 stray directories: expect collisions
935 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
*10, finaldirsync
=True, unlink
=True)
936 except CommandFailedError
:
939 self
.fail("fragment size exceeded")
940 strays_after
= self
.get_mdc_stat("strays_created")
941 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
943 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
944 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
946 # verify new files can be created and unlinked
947 self
.mount_a
.create_n_files("subdir/file", LOW_LIMIT
, dirsync
=True, unlink
=True)
949 def test_purge_queue_upgrade(self
):
951 That when starting on a system with no purge queue in the metadata
952 pool, we silently create one.
956 self
.mds_cluster
.mds_stop()
957 self
.mds_cluster
.mds_fail()
958 self
.fs
.radosm(["rm", "500.00000000"])
959 self
.mds_cluster
.mds_restart()
960 self
.fs
.wait_for_daemons()
962 def test_replicated_delete_speed(self
):
964 That deletions of replicated metadata are not pathologically slow
966 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
968 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
969 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
970 self
.fs
.wait_for_daemons()
974 self
.mount_a
.create_n_files("delete_me/file", file_count
)
976 self
._force
_migrate
("delete_me")
978 begin
= datetime
.datetime
.now()
979 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
980 end
= datetime
.datetime
.now()
982 # What we're really checking here is that we are completing client
983 # operations immediately rather than delaying until the next tick.
984 tick_period
= float(self
.fs
.get_config("mds_tick_interval",
987 duration
= (end
- begin
).total_seconds()
988 self
.assertLess(duration
, (file_count
* tick_period
) * 0.25)