4 from textwrap
import dedent
7 from teuthology
.orchestra
.run
import CommandFailedError
, Raw
8 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, for_teuthology
10 log
= logging
.getLogger(__name__
)
13 class TestStrays(CephFSTestCase
):
19 # Range of different file sizes used in throttle test's workload
20 throttle_workload_size_range
= 16
23 def test_ops_throttle(self
):
24 self
._test
_throttling
(self
.OPS_THROTTLE
)
27 def test_files_throttle(self
):
28 self
._test
_throttling
(self
.FILES_THROTTLE
)
30 def test_dir_deletion(self
):
32 That when deleting a bunch of dentries and the containing
33 directory, everything gets purged.
34 Catches cases where the client might e.g. fail to trim
35 the unlinked dir from its cache.
38 create_script
= dedent("""
41 mount_path = "{mount_path}"
44 file_count = {file_count}
45 os.mkdir(os.path.join(mount_path, subdir))
46 for i in xrange(0, file_count):
47 filename = "{{0}}_{{1}}.bin".format(i, size)
48 f = open(os.path.join(mount_path, subdir, filename), 'w')
52 mount_path
=self
.mount_a
.mountpoint
,
57 self
.mount_a
.run_python(create_script
)
59 # That the dirfrag object is created
60 self
.fs
.mds_asok(["flush", "journal"])
61 dir_ino
= self
.mount_a
.path_to_ino("delete_me")
62 self
.assertTrue(self
.fs
.dirfrag_exists(dir_ino
, 0))
65 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
66 self
.fs
.mds_asok(["flush", "journal"])
68 # That all the removed files get created as strays
69 strays
= self
.get_mdc_stat("strays_created")
70 self
.assertEqual(strays
, file_count
+ 1)
72 # That the strays all get enqueued for purge
73 self
.wait_until_equal(
74 lambda: self
.get_mdc_stat("strays_enqueued"),
80 # That all the purge operations execute
81 self
.wait_until_equal(
82 lambda: self
.get_stat("purge_queue", "pq_executed"),
87 # That finally, the directory metadata object is gone
88 self
.assertFalse(self
.fs
.dirfrag_exists(dir_ino
, 0))
90 # That finally, the data objects are all gone
91 self
.await_data_pool_empty()
93 def _test_throttling(self
, throttle_type
):
96 return self
._do
_test
_throttling
(throttle_type
)
98 for l
in self
.data_log
:
99 log
.info(",".join([l_
.__str
__() for l_
in l
]))
102 def _do_test_throttling(self
, throttle_type
):
104 That the mds_max_purge_ops setting is respected
107 def set_throttles(files
, ops
):
109 Helper for updating ops/files limits, and calculating effective
110 ops_per_pg setting to give the same ops limit.
112 self
.set_conf('mds', 'mds_max_purge_files', "%d" % files
)
113 self
.set_conf('mds', 'mds_max_purge_ops', "%d" % ops
)
115 pgs
= self
.fs
.mon_manager
.get_pool_property(
116 self
.fs
.get_data_pool_name(),
119 ops_per_pg
= float(ops
) / pgs
120 self
.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg
)
122 # Test conditions depend on what we're going to be exercising.
123 # * Lift the threshold on whatever throttle we are *not* testing, so
124 # that the throttle of interest is the one that will be the bottleneck
125 # * Create either many small files (test file count throttling) or fewer
126 # large files (test op throttling)
127 if throttle_type
== self
.OPS_THROTTLE
:
128 set_throttles(files
=100000000, ops
=16)
129 size_unit
= 1024 * 1024 # big files, generate lots of ops
130 file_multiplier
= 100
131 elif throttle_type
== self
.FILES_THROTTLE
:
132 # The default value of file limit is pretty permissive, so to avoid
133 # the test running too fast, create lots of files and set the limit
135 set_throttles(ops
=100000000, files
=6)
136 size_unit
= 1024 # small, numerous files
137 file_multiplier
= 200
139 raise NotImplemented(throttle_type
)
141 # Pick up config changes
142 self
.fs
.mds_fail_restart()
143 self
.fs
.wait_for_daemons()
145 create_script
= dedent("""
148 mount_path = "{mount_path}"
150 size_unit = {size_unit}
151 file_multiplier = {file_multiplier}
152 os.mkdir(os.path.join(mount_path, subdir))
153 for i in xrange(0, file_multiplier):
154 for size in xrange(0, {size_range}*size_unit, size_unit):
155 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
156 f = open(os.path.join(mount_path, subdir, filename), 'w')
160 mount_path
=self
.mount_a
.mountpoint
,
162 file_multiplier
=file_multiplier
,
163 size_range
=self
.throttle_workload_size_range
166 self
.mount_a
.run_python(create_script
)
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
171 self
.mount_a
.run_shell(["rm", "-rf", "delete_me"])
172 self
.fs
.mds_asok(["flush", "journal"])
174 background_thread
= gevent
.spawn(background
)
176 total_inodes
= file_multiplier
* self
.throttle_workload_size_range
+ 1
177 mds_max_purge_ops
= int(self
.fs
.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files
= int(self
.fs
.get_config("mds_max_purge_files", 'mds'))
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
189 stats
= self
.fs
.mds_asok(['perf', 'dump'])
190 mdc_stats
= stats
['mds_cache']
191 pq_stats
= stats
['purge_queue']
192 if elapsed
>= purge_timeout
:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes
, mdc_stats
))
195 num_strays
= mdc_stats
['num_strays']
196 num_strays_purging
= pq_stats
['pq_executing']
197 num_purge_ops
= pq_stats
['pq_executing_ops']
199 self
.data_log
.append([datetime
.datetime
.now(), num_strays
, num_strays_purging
, num_purge_ops
])
201 files_high_water
= max(files_high_water
, num_strays_purging
)
202 ops_high_water
= max(ops_high_water
, num_purge_ops
)
204 total_strays_created
= mdc_stats
['strays_created']
205 total_strays_purged
= pq_stats
['pq_executed']
207 if total_strays_purged
== total_inodes
:
208 log
.info("Complete purge in {0} seconds".format(elapsed
))
210 elif total_strays_purged
> total_inodes
:
211 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats
))
213 if throttle_type
== self
.OPS_THROTTLE
:
214 # 11 is filer_max_purge_ops plus one for the backtrace:
215 # limit is allowed to be overshot by this much.
216 if num_purge_ops
> mds_max_purge_ops
+ 11:
217 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
218 num_purge_ops
, mds_max_purge_ops
220 elif throttle_type
== self
.FILES_THROTTLE
:
221 if num_strays_purging
> mds_max_purge_files
:
222 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
223 num_strays_purging
, mds_max_purge_files
226 raise NotImplemented(throttle_type
)
228 log
.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
229 num_strays_purging
, num_strays
,
230 total_strays_purged
, total_strays_created
235 background_thread
.join()
237 # Check that we got up to a respectable rate during the purge. This is totally
238 # racy, but should be safeish unless the cluster is pathologically slow, or
239 # insanely fast such that the deletions all pass before we have polled the
241 if throttle_type
== self
.OPS_THROTTLE
:
242 if ops_high_water
< mds_max_purge_ops
/ 2:
243 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
244 ops_high_water
, mds_max_purge_ops
246 elif throttle_type
== self
.FILES_THROTTLE
:
247 if files_high_water
< mds_max_purge_files
/ 2:
248 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
249 ops_high_water
, mds_max_purge_files
252 # Sanity check all MDC stray stats
253 stats
= self
.fs
.mds_asok(['perf', 'dump'])
254 mdc_stats
= stats
['mds_cache']
255 pq_stats
= stats
['purge_queue']
256 self
.assertEqual(mdc_stats
['num_strays'], 0)
257 self
.assertEqual(mdc_stats
['num_strays_delayed'], 0)
258 self
.assertEqual(pq_stats
['pq_executing'], 0)
259 self
.assertEqual(pq_stats
['pq_executing_ops'], 0)
260 self
.assertEqual(mdc_stats
['strays_created'], total_inodes
)
261 self
.assertEqual(mdc_stats
['strays_enqueued'], total_inodes
)
262 self
.assertEqual(pq_stats
['pq_executed'], total_inodes
)
264 def get_mdc_stat(self
, name
, mds_id
=None):
265 return self
.get_stat("mds_cache", name
, mds_id
)
267 def get_stat(self
, subsys
, name
, mds_id
=None):
268 return self
.fs
.mds_asok(['perf', 'dump', subsys
, name
],
269 mds_id
=mds_id
)[subsys
][name
]
271 def _wait_for_counter(self
, subsys
, counter
, expect_val
, timeout
=60,
273 self
.wait_until_equal(
274 lambda: self
.get_stat(subsys
, counter
, mds_id
),
275 expect_val
=expect_val
, timeout
=timeout
,
276 reject_fn
=lambda x
: x
> expect_val
279 def test_open_inode(self
):
281 That the case of a dentry unlinked while a client holds an
282 inode open is handled correctly.
284 The inode should be moved into a stray dentry, while the original
285 dentry and directory should be purged.
287 The inode's data should be purged when the client eventually closes
290 mount_a_client_id
= self
.mount_a
.get_global_id()
292 # Write some bytes to a file
296 p
= self
.mount_a
.open_background("open_file")
297 self
.mount_a
.write_n_mb("open_file", size_mb
)
298 open_file_ino
= self
.mount_a
.path_to_ino("open_file")
300 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
303 self
.mount_a
.run_shell(["rm", "-f", "open_file"])
305 # Wait to see the stray count increment
306 self
.wait_until_equal(
307 lambda: self
.get_mdc_stat("num_strays"),
308 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 1)
310 # See that while the stray count has incremented, none have passed
311 # on to the purge queue
312 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
313 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
315 # See that the client still holds 2 caps
316 self
.assertEqual(self
.get_session(mount_a_client_id
)['num_caps'], 2)
318 # See that the data objects remain in the data pool
319 self
.assertTrue(self
.fs
.data_objects_present(open_file_ino
, size_mb
* 1024 * 1024))
322 self
.mount_a
.kill_background(p
)
324 # Wait to see the client cap count decrement
325 self
.wait_until_equal(
326 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
327 expect_val
=1, timeout
=60, reject_fn
=lambda x
: x
> 2 or x
< 1
329 # Wait to see the purge counter increment, stray count go to zero
330 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
331 self
.wait_until_equal(
332 lambda: self
.get_mdc_stat("num_strays"),
333 expect_val
=0, timeout
=6, reject_fn
=lambda x
: x
> 1
335 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
337 # See that the data objects no longer exist
338 self
.assertTrue(self
.fs
.data_objects_absent(open_file_ino
, size_mb
* 1024 * 1024))
340 self
.await_data_pool_empty()
342 def test_hardlink_reintegration(self
):
344 That removal of primary dentry of hardlinked inode results
345 in reintegration of inode into the previously-remote dentry,
346 rather than lingering as a stray indefinitely.
348 # Write some bytes to file_a
350 self
.mount_a
.run_shell(["mkdir", "dir_1"])
351 self
.mount_a
.write_n_mb("dir_1/file_a", size_mb
)
352 ino
= self
.mount_a
.path_to_ino("dir_1/file_a")
354 # Create a hardlink named file_b
355 self
.mount_a
.run_shell(["mkdir", "dir_2"])
356 self
.mount_a
.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
357 self
.assertEqual(self
.mount_a
.path_to_ino("dir_2/file_b"), ino
)
360 self
.fs
.mds_asok(['flush', 'journal'])
362 # See that backtrace for the file points to the file_a path
363 pre_unlink_bt
= self
.fs
.read_backtrace(ino
)
364 self
.assertEqual(pre_unlink_bt
['ancestors'][0]['dname'], "file_a")
366 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
367 self
.mount_a
.umount_wait()
368 self
.fs
.mds_asok(['flush', 'journal'])
369 self
.fs
.mds_fail_restart()
370 self
.fs
.wait_for_daemons()
374 self
.mount_a
.run_shell(["rm", "-f", "dir_1/file_a"])
376 # See that a stray was created
377 self
.assertEqual(self
.get_mdc_stat("num_strays"), 1)
378 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
380 # Wait, see that data objects are still present (i.e. that the
381 # stray did not advance to purging given time)
383 self
.assertTrue(self
.fs
.data_objects_present(ino
, size_mb
* 1024 * 1024))
384 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 0)
386 # See that before reintegration, the inode's backtrace points to a stray dir
387 self
.fs
.mds_asok(['flush', 'journal'])
388 self
.assertTrue(self
.get_backtrace_path(ino
).startswith("stray"))
390 last_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
392 # Do a metadata operation on the remaining link (mv is heavy handed, but
393 # others like touch may be satisfied from caps without poking MDS)
394 self
.mount_a
.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
396 # Stray reintegration should happen as a result of the eval_remote call
397 # on responding to a client request.
398 self
.wait_until_equal(
399 lambda: self
.get_mdc_stat("num_strays"),
404 # See the reintegration counter increment
405 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
406 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
407 last_reintegrated
= curr_reintegrated
410 self
.fs
.mds_asok(['flush', 'journal'])
412 # See that the backtrace for the file points to the remaining link's path
413 post_reint_bt
= self
.fs
.read_backtrace(ino
)
414 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_c")
416 # mds should reintegrates stray when unlink finishes
417 self
.mount_a
.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
418 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_c"])
420 # Stray reintegration should happen as a result of the notify_stray call
421 # on completion of unlink
422 self
.wait_until_equal(
423 lambda: self
.get_mdc_stat("num_strays"),
428 # See the reintegration counter increment
429 curr_reintegrated
= self
.get_mdc_stat("strays_reintegrated")
430 self
.assertGreater(curr_reintegrated
, last_reintegrated
)
431 last_reintegrated
= curr_reintegrated
434 self
.fs
.mds_asok(['flush', 'journal'])
436 # See that the backtrace for the file points to the newest link's path
437 post_reint_bt
= self
.fs
.read_backtrace(ino
)
438 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "file_d")
440 # Now really delete it
441 self
.mount_a
.run_shell(["rm", "-f", "dir_2/file_d"])
442 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
443 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
445 self
.assert_purge_idle()
446 self
.assertTrue(self
.fs
.data_objects_absent(ino
, size_mb
* 1024 * 1024))
448 # We caused the inode to go stray 3 times
449 self
.assertEqual(self
.get_mdc_stat("strays_created"), 3)
450 # We purged it at the last
451 self
.assertEqual(self
.get_mdc_stat("strays_enqueued"), 1)
453 def test_mv_hardlink_cleanup(self
):
455 That when doing a rename from A to B, and B has hardlinks,
456 then we make a stray for B which is then reintegrated
457 into one of his hardlinks.
459 # Create file_a, file_b, and a hardlink to file_b
461 self
.mount_a
.write_n_mb("file_a", size_mb
)
462 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
464 self
.mount_a
.write_n_mb("file_b", size_mb
)
465 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
467 self
.mount_a
.run_shell(["ln", "file_b", "linkto_b"])
468 self
.assertEqual(self
.mount_a
.path_to_ino("linkto_b"), file_b_ino
)
471 self
.mount_a
.run_shell(["mv", "file_a", "file_b"])
473 # Stray reintegration should happen as a result of the notify_stray call on
474 # completion of rename
475 self
.wait_until_equal(
476 lambda: self
.get_mdc_stat("num_strays"),
481 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
482 self
.assertGreaterEqual(self
.get_mdc_stat("strays_reintegrated"), 1)
484 # No data objects should have been deleted, as both files still have linkage.
485 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
486 self
.assertTrue(self
.fs
.data_objects_present(file_b_ino
, size_mb
* 1024 * 1024))
488 self
.fs
.mds_asok(['flush', 'journal'])
490 post_reint_bt
= self
.fs
.read_backtrace(file_b_ino
)
491 self
.assertEqual(post_reint_bt
['ancestors'][0]['dname'], "linkto_b")
493 def _setup_two_ranks(self
):
495 self
.fs
.set_allow_multimds(True)
496 self
.fs
.set_max_mds(2)
498 # See that we have two active MDSs
499 self
.wait_until_equal(lambda: len(self
.fs
.get_active_names()), 2, 30,
500 reject_fn
=lambda v
: v
> 2 or v
< 1)
502 active_mds_names
= self
.fs
.get_active_names()
503 rank_0_id
= active_mds_names
[0]
504 rank_1_id
= active_mds_names
[1]
505 log
.info("Ranks 0 and 1 are {0} and {1}".format(
506 rank_0_id
, rank_1_id
))
508 # Get rid of other MDS daemons so that it's easier to know which
509 # daemons to expect in which ranks after restarts
510 for unneeded_mds
in set(self
.mds_cluster
.mds_ids
) - {rank_0_id
, rank_1_id
}:
511 self
.mds_cluster
.mds_stop(unneeded_mds
)
512 self
.mds_cluster
.mds_fail(unneeded_mds
)
514 return rank_0_id
, rank_1_id
516 def _force_migrate(self
, to_id
, path
, watch_ino
):
518 :param to_id: MDS id to move it to
519 :param path: Filesystem path (string) to move
520 :param watch_ino: Inode number to look for at destination to confirm move
523 self
.mount_a
.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path
])
525 # Poll the MDS cache dump to watch for the export completing
530 data
= self
.fs
.mds_asok(["dump", "cache"], to_id
)
531 for inode_data
in data
:
532 if inode_data
['ino'] == watch_ino
:
533 log
.debug("Found ino in cache: {0}".format(json
.dumps(inode_data
, indent
=2)))
534 if inode_data
['is_auth'] is True:
539 if migrate_elapsed
> migrate_timeout
:
540 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed
))
545 def _is_stopped(self
, rank
):
546 mds_map
= self
.fs
.get_mds_map()
547 return rank
not in [i
['rank'] for i
in mds_map
['info'].values()]
549 def test_purge_on_shutdown(self
):
551 That when an MDS rank is shut down, its purge queue is
552 drained in the process.
554 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
556 self
.set_conf("mds.{0}".format(rank_1_id
), 'mds_max_purge_files', "0")
557 self
.mds_cluster
.mds_fail_restart(rank_1_id
)
558 self
.fs
.wait_for_daemons()
562 self
.mount_a
.create_n_files("delete_me/file", file_count
)
564 self
._force
_migrate
(rank_1_id
, "delete_me",
565 self
.mount_a
.path_to_ino("delete_me/file_0"))
567 self
.mount_a
.run_shell(["rm", "-rf", Raw("delete_me/*")])
568 self
.mount_a
.umount_wait()
570 # See all the strays go into purge queue
571 self
._wait
_for
_counter
("mds_cache", "strays_created", file_count
, mds_id
=rank_1_id
)
572 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", file_count
, mds_id
=rank_1_id
)
573 self
.assertEqual(self
.get_stat("mds_cache", "num_strays", mds_id
=rank_1_id
), 0)
575 # See nothing get purged from the purge queue (yet)
577 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
580 self
.fs
.set_max_mds(1)
581 self
.fs
.deactivate(1)
583 # It shouldn't proceed past stopping because its still not allowed
586 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed", mds_id
=rank_1_id
), 0)
587 self
.assertFalse(self
._is
_stopped
(1))
589 # Permit the daemon to start purging again
590 self
.fs
.mon_manager
.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id
),
592 "--mds_max_purge_files 100")
594 # It should now proceed through shutdown
595 self
.wait_until_true(
596 lambda: self
._is
_stopped
(1),
600 # ...and in the process purge all that data
601 self
.await_data_pool_empty()
603 def test_migration_on_shutdown(self
):
605 That when an MDS rank is shut down, any non-purgeable strays
606 get migrated to another rank.
609 rank_0_id
, rank_1_id
= self
._setup
_two
_ranks
()
611 # Create a non-purgeable stray in a ~mds1 stray directory
612 # by doing a hard link and deleting the original file
613 self
.mount_a
.run_shell(["mkdir", "dir_1", "dir_2"])
614 self
.mount_a
.run_shell(["touch", "dir_1/original"])
615 self
.mount_a
.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
617 self
._force
_migrate
(rank_1_id
, "dir_1",
618 self
.mount_a
.path_to_ino("dir_1/original"))
620 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
621 self
.mount_a
.umount_wait()
622 self
.fs
.mds_asok(['flush', 'journal'], rank_0_id
)
623 self
.fs
.mds_asok(['flush', 'journal'], rank_1_id
)
624 self
.fs
.mds_fail_restart()
625 self
.fs
.wait_for_daemons()
627 active_mds_names
= self
.fs
.get_active_names()
628 rank_0_id
= active_mds_names
[0]
629 rank_1_id
= active_mds_names
[1]
633 self
.mount_a
.run_shell(["rm", "-f", "dir_1/original"])
634 self
.mount_a
.umount_wait()
636 self
._wait
_for
_counter
("mds_cache", "strays_created", 1,
640 self
.fs
.mon_manager
.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
641 self
.fs
.mon_manager
.raw_cluster_cmd_result('mds', 'deactivate', "1")
643 # Wait til we get to a single active MDS mdsmap state
644 self
.wait_until_true(lambda: self
._is
_stopped
(1), timeout
=120)
646 # See that the stray counter on rank 0 has incremented
647 self
.assertEqual(self
.get_mdc_stat("strays_created", rank_0_id
), 1)
649 def assert_backtrace(self
, ino
, expected_path
):
651 Assert that the backtrace in the data pool for an inode matches
652 an expected /foo/bar path.
654 expected_elements
= expected_path
.strip("/").split("/")
655 bt
= self
.fs
.read_backtrace(ino
)
656 actual_elements
= list(reversed([dn
['dname'] for dn
in bt
['ancestors']]))
657 self
.assertListEqual(expected_elements
, actual_elements
)
659 def get_backtrace_path(self
, ino
):
660 bt
= self
.fs
.read_backtrace(ino
)
661 elements
= reversed([dn
['dname'] for dn
in bt
['ancestors']])
662 return "/".join(elements
)
664 def assert_purge_idle(self
):
666 Assert that the MDS perf counters indicate no strays exist and
667 no ongoing purge activity. Sanity check for when PurgeQueue should
670 mdc_stats
= self
.fs
.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
671 pq_stats
= self
.fs
.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
672 self
.assertEqual(mdc_stats
["num_strays"], 0)
673 self
.assertEqual(mdc_stats
["num_strays_delayed"], 0)
674 self
.assertEqual(pq_stats
["pq_executing"], 0)
675 self
.assertEqual(pq_stats
["pq_executing_ops"], 0)
677 def test_mv_cleanup(self
):
679 That when doing a rename from A to B, and B has no hardlinks,
680 then we make a stray for B and purge him.
682 # Create file_a and file_b, write some to both
684 self
.mount_a
.write_n_mb("file_a", size_mb
)
685 file_a_ino
= self
.mount_a
.path_to_ino("file_a")
686 self
.mount_a
.write_n_mb("file_b", size_mb
)
687 file_b_ino
= self
.mount_a
.path_to_ino("file_b")
689 self
.fs
.mds_asok(['flush', 'journal'])
690 self
.assert_backtrace(file_a_ino
, "file_a")
691 self
.assert_backtrace(file_b_ino
, "file_b")
694 self
.mount_a
.run_shell(['mv', 'file_a', 'file_b'])
696 # See that stray counter increments
697 self
.assertEqual(self
.get_mdc_stat("strays_created"), 1)
698 # Wait for purge counter to increment
699 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 1)
700 self
._wait
_for
_counter
("purge_queue", "pq_executed", 1)
702 self
.assert_purge_idle()
704 # file_b should have been purged
705 self
.assertTrue(self
.fs
.data_objects_absent(file_b_ino
, size_mb
* 1024 * 1024))
707 # Backtrace should have updated from file_a to file_b
708 self
.fs
.mds_asok(['flush', 'journal'])
709 self
.assert_backtrace(file_a_ino
, "file_b")
711 # file_a's data should still exist
712 self
.assertTrue(self
.fs
.data_objects_present(file_a_ino
, size_mb
* 1024 * 1024))
714 def _pool_df(self
, pool_name
):
720 "max_avail": 19630292406,
724 :param pool_name: Which pool (must exist)
726 out
= self
.fs
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty")
727 for p
in json
.loads(out
)['pools']:
728 if p
['name'] == pool_name
:
731 raise RuntimeError("Pool '{0}' not found".format(pool_name
))
733 def await_data_pool_empty(self
):
734 self
.wait_until_true(
735 lambda: self
._pool
_df
(
736 self
.fs
.get_data_pool_name()
740 def test_snapshot_remove(self
):
742 That removal of a snapshot that references a now-unlinked file results
743 in purging on the stray for the file.
746 self
.fs
.mon_manager
.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
747 "--yes-i-really-mean-it")
749 # Create a dir with a file in it
751 self
.mount_a
.run_shell(["mkdir", "snapdir"])
752 self
.mount_a
.run_shell(["mkdir", "snapdir/subdir"])
753 self
.mount_a
.write_test_pattern("snapdir/subdir/file_a", size_mb
* 1024 * 1024)
754 file_a_ino
= self
.mount_a
.path_to_ino("snapdir/subdir/file_a")
757 self
.mount_a
.run_shell(["mkdir", "snapdir/.snap/snap1"])
759 # Cause the head revision to deviate from the snapshot
760 self
.mount_a
.write_n_mb("snapdir/subdir/file_a", size_mb
)
762 # Flush the journal so that backtraces, dirfrag objects will actually be written
763 self
.fs
.mds_asok(["flush", "journal"])
766 self
.mount_a
.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
767 self
.mount_a
.run_shell(["rmdir", "snapdir/subdir"])
769 # Unmount the client because when I come back to check the data is still
770 # in the file I don't want to just see what's in the page cache.
771 self
.mount_a
.umount_wait()
773 self
.assertEqual(self
.get_mdc_stat("strays_created"), 2)
775 # FIXME: at this stage we see a purge and the stray count drops to
776 # zero, but there's actually still a stray, so at the very
777 # least the StrayManager stats code is slightly off
781 # See that the data from the snapshotted revision of the file is still present
783 self
.mount_a
.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb
* 1024 * 1024)
785 # Remove the snapshot
786 self
.mount_a
.run_shell(["rmdir", "snapdir/.snap/snap1"])
787 self
.mount_a
.umount_wait()
789 # Purging file_a doesn't happen until after we've flushed the journal, because
790 # it is referenced by the snapshotted subdir, and the snapshot isn't really
791 # gone until the journal references to it are gone
792 self
.fs
.mds_asok(["flush", "journal"])
794 # See that a purge happens now
795 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", 2)
796 self
._wait
_for
_counter
("purge_queue", "pq_executed", 2)
798 self
.assertTrue(self
.fs
.data_objects_absent(file_a_ino
, size_mb
* 1024 * 1024))
799 self
.await_data_pool_empty()
801 def test_fancy_layout(self
):
803 purge stray file with fancy layout
806 file_name
= "fancy_layout_file"
807 self
.mount_a
.run_shell(["touch", file_name
])
809 file_layout
= "stripe_unit=1048576 stripe_count=4 object_size=8388608"
810 self
.mount_a
.setfattr(file_name
, "ceph.file.layout", file_layout
)
812 # 35MB requires 7 objects
814 self
.mount_a
.write_n_mb(file_name
, size_mb
)
816 self
.mount_a
.run_shell(["rm", "-f", file_name
])
817 self
.fs
.mds_asok(["flush", "journal"])
819 # can't use self.fs.data_objects_absent here, it does not support fancy layout
820 self
.await_data_pool_empty()
822 def test_dirfrag_limit(self
):
824 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
826 That fragmentation (forced) will allow more entries to be created.
828 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
831 self
.fs
.set_allow_dirfrags(True)
834 for mds
in self
.fs
.get_daemon_names():
835 self
.fs
.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT
)], mds
)
838 self
.mount_a
.run_python(dedent("""
840 path = os.path.join("{path}", "subdir")
842 for n in range(0, {file_count}):
843 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
845 path
=self
.mount_a
.mountpoint
,
846 file_count
=LOW_LIMIT
+1
848 except CommandFailedError
:
851 raise RuntimeError("fragment size exceeded")
853 # Now test that we can go beyond the limit if we fragment the directory
855 self
.mount_a
.run_python(dedent("""
857 path = os.path.join("{path}", "subdir2")
859 for n in range(0, {file_count}):
860 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
861 dfd = os.open(path, os.O_DIRECTORY)
864 path
=self
.mount_a
.mountpoint
,
868 # Ensure that subdir2 is fragmented
869 mds_id
= self
.fs
.get_active_names()[0]
870 self
.fs
.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id
)
872 # remount+flush (release client caps)
873 self
.mount_a
.umount_wait()
874 self
.fs
.mds_asok(["flush", "journal"], mds_id
)
876 self
.mount_a
.wait_until_mounted()
878 # Create 50% more files than the current fragment limit
879 self
.mount_a
.run_python(dedent("""
881 path = os.path.join("{path}", "subdir2")
882 for n in range({file_count}, ({file_count}*3)//2):
883 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
885 path
=self
.mount_a
.mountpoint
,
889 # Now test the stray directory size is limited and recovers
890 strays_before
= self
.get_mdc_stat("strays_created")
892 self
.mount_a
.run_python(dedent("""
894 path = os.path.join("{path}", "subdir3")
896 for n in range({file_count}):
897 fpath = os.path.join(path, "%s" % n)
903 path
=self
.mount_a
.mountpoint
,
904 file_count
=LOW_LIMIT
*10 # 10 stray directories, should collide before this count
906 except CommandFailedError
:
909 raise RuntimeError("fragment size exceeded")
911 strays_after
= self
.get_mdc_stat("strays_created")
912 self
.assertGreaterEqual(strays_after
-strays_before
, LOW_LIMIT
)
914 self
._wait
_for
_counter
("mds_cache", "strays_enqueued", strays_after
)
915 self
._wait
_for
_counter
("purge_queue", "pq_executed", strays_after
)
917 self
.mount_a
.run_python(dedent("""
919 path = os.path.join("{path}", "subdir4")
921 for n in range({file_count}):
922 fpath = os.path.join(path, "%s" % n)
928 path
=self
.mount_a
.mountpoint
,
932 def test_purge_queue_upgrade(self
):
934 That when starting on a system with no purge queue in the metadata
935 pool, we silently create one.
939 self
.mds_cluster
.mds_stop()
940 self
.mds_cluster
.mds_fail()
941 self
.fs
.rados(["rm", "500.00000000"])
942 self
.mds_cluster
.mds_restart()
943 self
.fs
.wait_for_daemons()
945 def test_purge_queue_op_rate(self
):
947 A busy purge queue is meant to aggregate operations sufficiently
948 that our RADOS ops to the metadata pool are not O(files). Check
953 # For low rates of deletion, the rate of metadata ops actually
954 # will be o(files), so to see the desired behaviour we have to give
955 # the system a significant quantity, i.e. an order of magnitude
956 # more than the number of files it will purge at one time.
960 self
.set_conf('mds', 'mds_bal_frag', 'false')
961 self
.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files
)
962 self
.fs
.mds_fail_restart()
963 self
.fs
.wait_for_daemons()
968 self
.mount_a
.run_shell(["mkdir", "phase1"])
969 self
.mount_a
.create_n_files("phase1/file", phase_1_files
)
971 self
.mount_a
.run_shell(["mkdir", "phase2"])
972 self
.mount_a
.create_n_files("phase2/file", phase_2_files
)
974 def unlink_and_count_ops(path
, expected_deletions
):
975 initial_ops
= self
.get_stat("objecter", "op")
976 initial_pq_executed
= self
.get_stat("purge_queue", "pq_executed")
978 self
.mount_a
.run_shell(["rm", "-rf", path
])
980 self
._wait
_for
_counter
(
981 "purge_queue", "pq_executed", initial_pq_executed
+ expected_deletions
984 final_ops
= self
.get_stat("objecter", "op")
986 # Calculation of the *overhead* operations, i.e. do not include
987 # the operations where we actually delete files.
988 return final_ops
- initial_ops
- expected_deletions
990 self
.fs
.mds_asok(['flush', 'journal'])
991 phase1_ops
= unlink_and_count_ops("phase1/", phase_1_files
+ 1)
993 self
.fs
.mds_asok(['flush', 'journal'])
994 phase2_ops
= unlink_and_count_ops("phase2/", phase_2_files
+ 1)
996 log
.info("Phase 1: {0}".format(phase1_ops
))
997 log
.info("Phase 2: {0}".format(phase2_ops
))
999 # The success criterion is that deleting double the number
1000 # of files doesn't generate double the number of overhead ops
1001 # -- this comparison is a rough approximation of that rule.
1002 self
.assertTrue(phase2_ops
< phase1_ops
* 1.25)
1004 # Finally, check that our activity did include properly quiescing
1005 # the queue (i.e. call to Journaler::write_head in the right place),
1006 # by restarting the MDS and checking that it doesn't try re-executing
1007 # any of the work we did.
1008 self
.fs
.mds_asok(['flush', 'journal']) # flush to ensure no strays
1010 self
.fs
.mds_fail_restart()
1011 self
.fs
.wait_for_daemons()
1013 self
.assertEqual(self
.get_stat("purge_queue", "pq_executed"), 0)