4 from textwrap
import dedent
8 from teuthology
.orchestra
.run
import CommandFailedError
, Raw
9 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
11 log
= logging
.getLogger(__name__
)
14 class TestStrays(CephFSTestCase
):
20 # Range of different file sizes used in throttle test's workload
21 throttle_workload_size_range
= 16
24 def test_ops_throttle(self
):
25 self
._test
_throttling
(self
.OPS_THROTTLE
)
28 def test_files_throttle(self
):
29 self
._test
_throttling
(self
.FILES_THROTTLE
)
31 def test_dir_deletion(self
):
33 That when deleting a bunch of dentries and the containing
34 directory, everything gets purged.
35 Catches cases where the client might e.g. fail to trim
36 the unlinked dir from its cache.
39 create_script
= dedent("""
42 mount_path = "{mount_path}"
45 file_count = {file_count}
46 os.mkdir(os.path.join(mount_path, subdir))
47 for i in range(0, file_count):
48 filename = "{{0}}_{{1}}.bin".format(i, size)
49 with open(os.path.join(mount_path, subdir, filename), 'w') as f:
52 mount_path
=self
.mount_a
.mountpoint
,
57 self
.mount_a
.run_python(create_script
)
59 # That the dirfrag object is created
60 self
.fs
.mds_asok(["flush", "journal"])
61 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
62 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
65 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
66 self
.fs
.mds_asok(["flush", "journal"])
68 # That all the removed files get created as strays
69 strays
= self
.get_mdc_stat("strays_created")
70 self
.assertEqual(strays
, file_count
+ 1)
72 # That the strays all get enqueued for purge
73 self
.wait_until_equal(
74 lambda: self
.get_mdc_stat("strays_enqueued"),
80 # That all the purge operations execute
81 self
.wait_until_equal(
82 lambda: self
.get_stat("purge_queue", "pq_executed"),
87 # That finally, the directory metadata object is gone
88 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
90 # That finally, the data objects are all gone
91 self
.await_data_pool_empty()
93 def _test_throttling(self
, throttle_type
):
96 return self
._do
_test
_throttling
(throttle_type
)
98 for l
in self
.data_log
:
99 log
.info(",".join([l_
.__str
__() for l_
in l
]))
102 def _do_test_throttling(self
, throttle_type
):
104 That the mds_max_purge_ops setting is respected
107 def set_throttles(files
, ops
):
109 Helper for updating ops/files limits, and calculating effective
110 ops_per_pg setting to give the same ops limit.
112 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
113 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
115 pgs
= self
.fs
.mon_manager
.get_pool_int_property(
116 self
.fs
.get_data_pool_name(),
119 ops_per_pg
= float(ops
) / pgs
120 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
122 # Test conditions depend on what we're going to be exercising.
123 # * Lift the threshold on whatever throttle we are *not* testing, so
124 # that the throttle of interest is the one that will be the bottleneck
125 # * Create either many small files (test file count throttling) or fewer
126 # large files (test op throttling)
127 if throttle_type
== self
.OPS_THROTTLE
:
128 set_throttles(files
=100000000, ops
=16)
129 size_unit
= 1024 * 1024 # big files, generate lots of ops
130 file_multiplier
= 100
131 elif throttle_type
== self
.FILES_THROTTLE
:
132 # The default value of file limit is pretty permissive, so to avoid
133 # the test running too fast, create lots of files and set the limit
135 set_throttles(ops
=100000000, files
=6)
136 size_unit
= 1024 # small, numerous files
137 file_multiplier
= 200
139 raise NotImplementedError(throttle_type
)
141 # Pick up config changes
142 self
.fs
.mds_fail_restart()
143 self
.fs
.wait_for_daemons()
145 create_script
= dedent("""
148 mount_path = "{mount_path}"
150 size_unit = {size_unit}
151 file_multiplier = {file_multiplier}
152 os.mkdir(os.path.join(mount_path, subdir))
153 for i in range(0, file_multiplier):
154 for size in range(0, {size_range}*size_unit, size_unit):
155 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
156 with open(os.path.join(mount_path, subdir, filename), 'w') as f:
159 mount_path
=self
.mount_a
.mountpoint
,
161 file_multiplier
=file_multiplier
,
162 size_range
=self
.throttle_workload_size_range
165 self
.mount_a
.run_python(create_script
)
167 # We will run the deletion in the background, to reduce the risk of it completing before
168 # we have started monitoring the stray statistics.
170 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
171 self
.fs
.mds_asok(["flush", "journal"])
173 background_thread
= gevent
.spawn(background
)
175 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
176 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
177 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
179 # During this phase we look for the concurrent ops to exceed half
180 # the limit (a heuristic) and not exceed the limit (a correctness
188 stats
= self
.fs
.mds_asok(['perf', 'dump'])
189 mdc_stats
= stats
['mds_cache']
190 pq_stats
= stats
['purge_queue']
191 if elapsed
>= purge_timeout
:
192 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
194 num_strays
= mdc_stats
['num_strays']
195 num_strays_purging
= pq_stats
['pq_executing']
196 num_purge_ops
= pq_stats
['pq_executing_ops']
197 files_high_water
= pq_stats
['pq_executing_high_water']
198 ops_high_water
= pq_stats
['pq_executing_ops_high_water']
200 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
, files_high_water
, ops_high_water
])
202 total_strays_created
= mdc_stats
['strays_created']
203 total_strays_purged
= pq_stats
['pq_executed']
205 if total_strays_purged
== total_inodes
:
206 log
.info("Complete purge in {0} seconds".format(elapsed
))
208 elif total_strays_purged
> total_inodes
:
209 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
211 if throttle_type
== self
.OPS_THROTTLE
:
212 # 11 is filer_max_purge_ops plus one for the backtrace:
213 # limit is allowed to be overshot by this much.
214 if num_purge_ops
> mds_max_purge_ops
+ 11:
215 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
216 num_purge_ops
, mds_max_purge_ops
218 elif throttle_type
== self
.FILES_THROTTLE
:
219 if num_strays_purging
> mds_max_purge_files
:
220 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
221 num_strays_purging
, mds_max_purge_files
224 raise NotImplementedError(throttle_type
)
226 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
227 num_strays_purging
, num_strays
,
228 total_strays_purged
, total_strays_created
233 background_thread
.join()
235 # Check that we got up to a respectable rate during the purge. This is totally
236 # racy, but should be safeish unless the cluster is pathologically slow, or
237 # insanely fast such that the deletions all pass before we have polled the
239 if throttle_type
== self
.OPS_THROTTLE
:
240 if ops_high_water
< mds_max_purge_ops
// 2:
241 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
242 ops_high_water
, mds_max_purge_ops
244 # The MDS may go over mds_max_purge_ops for some items, like a
245 # heavily fragmented directory. The throttle does not kick in
246 # until *after* we reach or exceed the limit. This is expected
247 # because we don't want to starve the PQ or never purge a
248 # particularly large file/directory.
249 self
.assertLessEqual(ops_high_water
, mds_max_purge_ops
+64)
250 elif throttle_type
== self
.FILES_THROTTLE
:
251 if files_high_water
< mds_max_purge_files
// 2:
252 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
253 files_high_water
, mds_max_purge_files
255 self
.assertLessEqual(files_high_water
, mds_max_purge_files
)
257 # Sanity check all MDC stray stats
258 stats
= self
.fs
.mds_asok(['perf', 'dump'])
259 mdc_stats
= stats
['mds_cache']
260 pq_stats
= stats
['purge_queue']
261 self
.assertEqual(mdc_stats
['num_strays'], 0)
262 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
263 self
.assertEqual(pq_stats
['pq_executing'], 0)
264 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
265 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
266 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
267 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
269 def get_mdc_stat(self
, name
, mds_id
=None):
270 return self
.get_stat("mds_cache", name
, mds_id
)
272 def get_stat(self
, subsys
, name
, mds_id
=None):
273 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
274 mds_id
=mds_id
)[subsys
][name
]
276 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
278 self
.wait_until_equal(
279 lambda: self
.get_stat(subsys
, counter
, mds_id
),
280 expect_val
=expect_val
, timeout
=timeout
,
281 reject_fn
=lambda x
: x
> expect_val
284 def test_open_inode(self
):
286 That the case of a dentry unlinked while a client holds an
287 inode open is handled correctly.
289 The inode should be moved into a stray dentry, while the original
290 dentry and directory should be purged.
292 The inode's data should be purged when the client eventually closes
295 mount_a_client_id
= self
.mount_a
.get_global_id()
297 # Write some bytes to a file
301 p
= self
.mount_a
.open_background("open_file")
302 self
.mount_a
.write_n_mb("open_file", size_mb
)
303 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
305 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
308 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
310 # Wait to see the stray count increment
311 self
.wait_until_equal(
312 lambda: self
.get_mdc_stat("num_strays"),
313 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
315 # See that while the stray count has incremented, none have passed
316 # on to the purge queue
317 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
318 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
320 # See that the client still holds 2 caps
321 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
323 # See that the data objects remain in the data pool
324 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
327 self
.mount_a
.kill_background(p
)
329 # Wait to see the client cap count decrement
330 self
.wait_until_equal(
331 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
332 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
334 # Wait to see the purge counter increment, stray count go to zero
335 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
336 self
.wait_until_equal(
337 lambda: self
.get_mdc_stat("num_strays"),
338 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
340 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
342 # See that the data objects no longer exist
343 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
345 self
.await_data_pool_empty()
347 def test_hardlink_reintegration(self
):
349 That removal of primary dentry of hardlinked inode results
350 in reintegration of inode into the previously-remote dentry,
351 rather than lingering as a stray indefinitely.
353 # Write some bytes to file_a
355 self
.mount_a
.run_shell(["mkdir", "dir_1"])
356 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
357 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
359 # Create a hardlink named file_b
360 self
.mount_a
.run_shell(["mkdir", "dir_2"])
361 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
362 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
365 self
.fs
.mds_asok(['flush', 'journal'])
367 # See that backtrace for the file points to the file_a path
368 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
369 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
371 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
372 self
.mount_a
.umount_wait()
373 self
.fs
.mds_asok(['flush', 'journal'])
374 self
.fs
.mds_fail_restart()
375 self
.fs
.wait_for_daemons()
376 self
.mount_a
.mount_wait()
379 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
381 # See that a stray was created
382 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
383 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
385 # Wait, see that data objects are still present (i.e. that the
386 # stray did not advance to purging given time)
388 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
389 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
391 # See that before reintegration, the inode's backtrace points to a stray dir
392 self
.fs
.mds_asok(['flush', 'journal'])
393 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
395 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
397 # Do a metadata operation on the remaining link (mv is heavy handed, but
398 # others like touch may be satisfied from caps without poking MDS)
399 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
401 # Stray reintegration should happen as a result of the eval_remote call
402 # on responding to a client request.
403 self
.wait_until_equal(
404 lambda: self
.get_mdc_stat("num_strays"),
409 # See the reintegration counter increment
410 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
411 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
412 last_reintegrated
= curr_reintegrated
415 self
.fs
.mds_asok(['flush', 'journal'])
417 # See that the backtrace for the file points to the remaining link's path
418 post_reint_bt
= self
.fs
.read_backtrace(ino
)
419 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
421 # mds should reintegrates stray when unlink finishes
422 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
423 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
425 # Stray reintegration should happen as a result of the notify_stray call
426 # on completion of unlink
427 self
.wait_until_equal(
428 lambda: self
.get_mdc_stat("num_strays"),
433 # See the reintegration counter increment
434 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
435 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
436 last_reintegrated
= curr_reintegrated
439 self
.fs
.mds_asok(['flush', 'journal'])
441 # See that the backtrace for the file points to the newest link's path
442 post_reint_bt
= self
.fs
.read_backtrace(ino
)
443 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
445 # Now really delete it
446 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
447 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
448 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
450 self
.assert_purge_idle()
451 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
453 # We caused the inode to go stray 3 times
454 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
455 # We purged it at the last
456 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
458 def test_mv_hardlink_cleanup(self
):
460 That when doing a rename from A to B, and B has hardlinks,
461 then we make a stray for B which is then reintegrated
462 into one of his hardlinks.
464 # Create file_a, file_b, and a hardlink to file_b
466 self
.mount_a
.write_n_mb("file_a", size_mb
)
467 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
469 self
.mount_a
.write_n_mb("file_b", size_mb
)
470 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
472 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
473 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
476 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
478 # Stray reintegration should happen as a result of the notify_stray call on
479 # completion of rename
480 self
.wait_until_equal(
481 lambda: self
.get_mdc_stat("num_strays"),
486 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
487 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
489 # No data objects should have been deleted, as both files still have linkage.
490 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
491 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
493 self
.fs
.mds_asok(['flush', 'journal'])
495 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
496 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
498 def _setup_two_ranks(self
):
500 self
.fs
.set_max_mds(2)
502 # See that we have two active MDSs
503 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
504 reject_fn
=lambda v
: v
> 2 or v
< 1)
506 active_mds_names
= self
.fs
.get_active_names()
507 rank_0_id
= active_mds_names
[0]
508 rank_1_id
= active_mds_names
[1]
509 log
.info("Ranks 0 and 1 are {0} and {1}".format(
510 rank_0_id
, rank_1_id
))
512 # Get rid of other MDS daemons so that it's easier to know which
513 # daemons to expect in which ranks after restarts
514 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
515 self
.mds_cluster
.mds_stop(unneeded_mds
)
516 self
.mds_cluster
.mds_fail(unneeded_mds
)
518 return rank_0_id
, rank_1_id
520 def _force_migrate(self
, to_id
, path
, watch_ino
):
522 :param to_id: MDS id to move it to
523 :param path: Filesystem path (string) to move
524 :param watch_ino: Inode number to look for at destination to confirm move
527 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path
])
529 # Poll the MDS cache dump to watch for the export completing
534 data
= self
.fs
.mds_asok(["dump", "cache"], to_id
)
535 for inode_data
in data
:
536 if inode_data
['ino'] == watch_ino
:
537 log
.debug("Found ino in cache: {0}".format(json
.dumps(inode_data
, indent
=2)))
538 if inode_data
['is_auth'] is True:
543 if migrate_elapsed
> migrate_timeout
:
544 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed
))
549 def _is_stopped(self
, rank
):
550 mds_map
= self
.fs
.get_mds_map()
551 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
553 def test_purge_on_shutdown(self
):
555 That when an MDS rank is shut down, its purge queue is
556 drained in the process.
558 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
560 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
561 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
562 self
.fs
.wait_for_daemons()
566 self
.mount_a
.create_n_files("delete_me/file", file_count
)
568 self
._force
_migrate
(rank_1_id
, "delete_me",
569 self
.mount_a
.path_to_ino("delete_me/file_0"))
571 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
572 self
.mount_a
.umount_wait()
574 # See all the strays go into purge queue
575 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
576 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
577 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
579 # See nothing get purged from the purge queue (yet)
581 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
584 self
.fs
.set_max_mds(1)
586 # It shouldn't proceed past stopping because its still not allowed
589 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
590 self
.assertFalse(self
._is
_stopped
(1))
592 # Permit the daemon to start purging again
593 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
595 "--mds_max_purge_files 100")
597 # It should now proceed through shutdown
598 self
.fs
.wait_for_daemons(timeout
=120)
600 # ...and in the process purge all that data
601 self
.await_data_pool_empty()
603 def test_migration_on_shutdown(self
):
605 That when an MDS rank is shut down, any non-purgeable strays
606 get migrated to another rank.
609 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
611 # Create a non-purgeable stray in a ~mds1 stray directory
612 # by doing a hard link and deleting the original file
613 self
.mount_a
.run_shell(["mkdir", "dir_1", "dir_2"])
614 self
.mount_a
.run_shell(["touch", "dir_1/original"])
615 self
.mount_a
.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
617 self
._force
_migrate
(rank_1_id
, "dir_1",
618 self
.mount_a
.path_to_ino("dir_1/original"))
620 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
621 self
.mount_a
.umount_wait()
622 self
.fs
.mds_asok(['flush', 'journal'], rank_0_id
)
623 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
624 self
.fs
.mds_fail_restart()
625 self
.fs
.wait_for_daemons()
627 active_mds_names
= self
.fs
.get_active_names()
628 rank_0_id
= active_mds_names
[0]
629 rank_1_id
= active_mds_names
[1]
631 self
.mount_a
.mount_wait()
633 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
634 self
.mount_a
.umount_wait()
636 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
640 self
.fs
.set_max_mds(1)
641 self
.fs
.wait_for_daemons(timeout
=120)
643 # See that the stray counter on rank 0 has incremented
644 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
646 def assert_backtrace(self
, ino
, expected_path
):
648 Assert that the backtrace in the data pool for an inode matches
649 an expected /foo/bar path.
651 expected_elements
= expected_path
.strip("/").split("/")
652 bt
= self
.fs
.read_backtrace(ino
)
653 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
654 self
.assertListEqual(expected_elements
, actual_elements
)
656 def get_backtrace_path(self
, ino
):
657 bt
= self
.fs
.read_backtrace(ino
)
658 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
659 return "/".join(elements
)
661 def assert_purge_idle(self
):
663 Assert that the MDS perf counters indicate no strays exist and
664 no ongoing purge activity. Sanity check for when PurgeQueue should
667 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
668 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
669 self
.assertEqual(mdc_stats
["num_strays"], 0)
670 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
671 self
.assertEqual(pq_stats
["pq_executing"], 0)
672 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
674 def test_mv_cleanup(self
):
676 That when doing a rename from A to B, and B has no hardlinks,
677 then we make a stray for B and purge him.
679 # Create file_a and file_b, write some to both
681 self
.mount_a
.write_n_mb("file_a", size_mb
)
682 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
683 self
.mount_a
.write_n_mb("file_b", size_mb
)
684 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
686 self
.fs
.mds_asok(['flush', 'journal'])
687 self
.assert_backtrace(file_a_ino
, "file_a")
688 self
.assert_backtrace(file_b_ino
, "file_b")
691 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
693 # See that stray counter increments
694 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
695 # Wait for purge counter to increment
696 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
697 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
699 self
.assert_purge_idle()
701 # file_b should have been purged
702 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
704 # Backtrace should have updated from file_a to file_b
705 self
.fs
.mds_asok(['flush', 'journal'])
706 self
.assert_backtrace(file_a_ino
, "file_b")
708 # file_a's data should still exist
709 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
711 def _pool_df(self
, pool_name
):
717 "max_avail": 19630292406,
721 :param pool_name: Which pool (must exist)
723 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
724 for p
in json
.loads(out
)['pools']:
725 if p
['name'] == pool_name
:
728 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
730 def await_data_pool_empty(self
):
731 self
.wait_until_true(
732 lambda: self
._pool
_df
(
733 self
.fs
.get_data_pool_name()
737 def test_snapshot_remove(self
):
739 That removal of a snapshot that references a now-unlinked file results
740 in purging on the stray for the file.
743 self
.fs
.set_allow_new_snaps(True)
745 # Create a dir with a file in it
747 self
.mount_a
.run_shell(["mkdir", "snapdir"])
748 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
749 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
750 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
753 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
755 # Cause the head revision to deviate from the snapshot
756 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
758 # Flush the journal so that backtraces, dirfrag objects will actually be written
759 self
.fs
.mds_asok(["flush", "journal"])
762 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
763 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
765 # Unmount the client because when I come back to check the data is still
766 # in the file I don't want to just see what's in the page cache.
767 self
.mount_a
.umount_wait()
769 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
771 # FIXME: at this stage we see a purge and the stray count drops to
772 # zero, but there's actually still a stray, so at the very
773 # least the StrayManager stats code is slightly off
775 self
.mount_a
.mount_wait()
777 # See that the data from the snapshotted revision of the file is still present
779 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
781 # Remove the snapshot
782 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
784 # Purging file_a doesn't happen until after we've flushed the journal, because
785 # it is referenced by the snapshotted subdir, and the snapshot isn't really
786 # gone until the journal references to it are gone
787 self
.fs
.mds_asok(["flush", "journal"])
789 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
790 # See also: http://tracker.ceph.com/issues/20072
791 self
.wait_until_true(
792 lambda: self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024),
796 # See that a purge happens now
797 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
798 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
800 self
.await_data_pool_empty()
802 def test_fancy_layout(self
):
804 purge stray file with fancy layout
807 file_name
= "fancy_layout_file"
808 self
.mount_a
.run_shell(["touch", file_name
])
810 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
811 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
813 # 35MB requires 7 objects
815 self
.mount_a
.write_n_mb(file_name
, size_mb
)
817 self
.mount_a
.run_shell(["rm", "-f", file_name
])
818 self
.fs
.mds_asok(["flush", "journal"])
820 # can't use self.fs.data_objects_absent here, it does not support fancy layout
821 self
.await_data_pool_empty()
823 def test_dirfrag_limit(self
):
825 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
827 That fragmentation (forced) will allow more entries to be created.
829 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
833 for mds
in self
.fs
.get_daemon_names():
834 self
.fs
.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT
)], mds
)
837 self
.mount_a
.run_python(dedent("""
839 path = os.path.join("{path}", "subdir")
841 for n in range(0, {file_count}):
842 with open(os.path.join(path, "%s" % n), 'w') as f:
845 path
=self
.mount_a
.mountpoint
,
846 file_count
=LOW_LIMIT
+1
848 except CommandFailedError
:
851 raise RuntimeError("fragment size exceeded")
853 # Now test that we can go beyond the limit if we fragment the directory
855 self
.mount_a
.run_python(dedent("""
857 path = os.path.join("{path}", "subdir2")
859 for n in range(0, {file_count}):
860 with open(os.path.join(path, "%s" % n), 'w') as f:
862 dfd = os.open(path, os.O_DIRECTORY)
865 path
=self
.mount_a
.mountpoint
,
869 # Ensure that subdir2 is fragmented
870 mds_id
= self
.fs
.get_active_names()[0]
871 self
.fs
.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id
)
873 # remount+flush (release client caps)
874 self
.mount_a
.umount_wait()
875 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
876 self
.mount_a
.mount_wait()
878 # Create 50% more files than the current fragment limit
879 self
.mount_a
.run_python(dedent("""
881 path = os.path.join("{path}", "subdir2")
882 for n in range({file_count}, ({file_count}*3)//2):
883 with open(os.path.join(path, "%s" % n), 'w') as f:
886 path
=self
.mount_a
.mountpoint
,
890 # Now test the stray directory size is limited and recovers
891 strays_before
= self
.get_mdc_stat("strays_created")
893 self
.mount_a
.run_python(dedent("""
895 path = os.path.join("{path}", "subdir3")
897 for n in range({file_count}):
898 fpath = os.path.join(path, "%s" % n)
899 with open(fpath, 'w') as f:
903 path
=self
.mount_a
.mountpoint
,
904 file_count
=LOW_LIMIT
*10 # 10 stray directories, should collide before this count
906 except CommandFailedError
:
909 raise RuntimeError("fragment size exceeded")
911 strays_after
= self
.get_mdc_stat("strays_created")
912 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
914 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
915 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
917 self
.mount_a
.run_python(dedent("""
919 path = os.path.join("{path}", "subdir4")
921 for n in range({file_count}):
922 fpath = os.path.join(path, "%s" % n)
923 with open(fpath, 'w') as f:
927 path
=self
.mount_a
.mountpoint
,
931 def test_purge_queue_upgrade(self
):
933 That when starting on a system with no purge queue in the metadata
934 pool, we silently create one.
938 self
.mds_cluster
.mds_stop()
939 self
.mds_cluster
.mds_fail()
940 self
.fs
.rados(["rm", "500.00000000"])
941 self
.mds_cluster
.mds_restart()
942 self
.fs
.wait_for_daemons()
944 def test_replicated_delete_speed(self
):
946 That deletions of replicated metadata are not pathologically slow
948 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
950 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
951 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
952 self
.fs
.wait_for_daemons()
956 self
.mount_a
.create_n_files("delete_me/file", file_count
)
958 self
._force
_migrate
(rank_1_id
, "delete_me",
959 self
.mount_a
.path_to_ino("delete_me/file_0"))
961 begin
= datetime
.datetime
.now()
962 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
963 end
= datetime
.datetime
.now()
965 # What we're really checking here is that we are completing client
966 # operations immediately rather than delaying until the next tick.
967 tick_period
= float(self
.fs
.get_config("mds_tick_interval",
970 duration
= (end
- begin
).total_seconds()
971 self
.assertLess(duration
, (file_count
* tick_period
) * 0.25)