]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7
8 from teuthology.exceptions import CommandFailedError
9 from teuthology.orchestra.run import Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12 log = logging.getLogger(__name__)
13
14
15 class TestStrays(CephFSTestCase):
16 MDSS_REQUIRED = 2
17
18 OPS_THROTTLE = 1
19 FILES_THROTTLE = 2
20
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
23
24 @for_teuthology
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
27
28 @for_teuthology
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
31
32 def test_dir_deletion(self):
33 """
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
38 """
39 file_count = 1000
40 create_script = dedent("""
41 import os
42
43 mountpoint = "{mountpoint}"
44 subdir = "delete_me"
45 size = {size}
46 file_count = {file_count}
47 os.mkdir(os.path.join(mountpoint, subdir))
48 for i in range(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
51 f.write(size * 'x')
52 """.format(
53 mountpoint=self.mount_a.mountpoint,
54 size=1024,
55 file_count=file_count
56 ))
57
58 self.mount_a.run_python(create_script)
59
60 # That the dirfrag object is created
61 self.fs.mds_asok(["flush", "journal"])
62 dir_ino = self.mount_a.path_to_ino("delete_me")
63 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
64
65 # Remove everything
66 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
67 self.fs.mds_asok(["flush", "journal"])
68
69 # That all the removed files get created as strays
70 strays = self.get_mdc_stat("strays_created")
71 self.assertEqual(strays, file_count + 1)
72
73 # That the strays all get enqueued for purge
74 self.wait_until_equal(
75 lambda: self.get_mdc_stat("strays_enqueued"),
76 strays,
77 timeout=600
78
79 )
80
81 # That all the purge operations execute
82 self.wait_until_equal(
83 lambda: self.get_stat("purge_queue", "pq_executed"),
84 strays,
85 timeout=600
86 )
87
88 # That finally, the directory metadata object is gone
89 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
90
91 # That finally, the data objects are all gone
92 self.await_data_pool_empty()
93
94 def _test_throttling(self, throttle_type):
95 self.data_log = []
96 try:
97 return self._do_test_throttling(throttle_type)
98 except:
99 for l in self.data_log:
100 log.info(",".join([l_.__str__() for l_ in l]))
101 raise
102
103 def _do_test_throttling(self, throttle_type):
104 """
105 That the mds_max_purge_ops setting is respected
106 """
107
108 def set_throttles(files, ops):
109 """
110 Helper for updating ops/files limits, and calculating effective
111 ops_per_pg setting to give the same ops limit.
112 """
113 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
114 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
115
116 pgs = self.fs.mon_manager.get_pool_int_property(
117 self.fs.get_data_pool_name(),
118 "pg_num"
119 )
120 ops_per_pg = float(ops) / pgs
121 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
122
123 # Test conditions depend on what we're going to be exercising.
124 # * Lift the threshold on whatever throttle we are *not* testing, so
125 # that the throttle of interest is the one that will be the bottleneck
126 # * Create either many small files (test file count throttling) or fewer
127 # large files (test op throttling)
128 if throttle_type == self.OPS_THROTTLE:
129 set_throttles(files=100000000, ops=16)
130 size_unit = 1024 * 1024 # big files, generate lots of ops
131 file_multiplier = 100
132 elif throttle_type == self.FILES_THROTTLE:
133 # The default value of file limit is pretty permissive, so to avoid
134 # the test running too fast, create lots of files and set the limit
135 # pretty low.
136 set_throttles(ops=100000000, files=6)
137 size_unit = 1024 # small, numerous files
138 file_multiplier = 200
139 else:
140 raise NotImplementedError(throttle_type)
141
142 # Pick up config changes
143 self.fs.mds_fail_restart()
144 self.fs.wait_for_daemons()
145
146 create_script = dedent("""
147 import os
148
149 mountpoint = "{mountpoint}"
150 subdir = "delete_me"
151 size_unit = {size_unit}
152 file_multiplier = {file_multiplier}
153 os.mkdir(os.path.join(mountpoint, subdir))
154 for i in range(0, file_multiplier):
155 for size in range(0, {size_range}*size_unit, size_unit):
156 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
157 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
158 f.write(size * 'x')
159 """.format(
160 mountpoint=self.mount_a.mountpoint,
161 size_unit=size_unit,
162 file_multiplier=file_multiplier,
163 size_range=self.throttle_workload_size_range
164 ))
165
166 self.mount_a.run_python(create_script)
167
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
170 def background():
171 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
172 self.fs.mds_asok(["flush", "journal"])
173
174 background_thread = gevent.spawn(background)
175
176 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
177 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
179
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
182 # condition).
183 purge_timeout = 600
184 elapsed = 0
185 files_high_water = 0
186 ops_high_water = 0
187
188 while True:
189 stats = self.fs.mds_asok(['perf', 'dump'])
190 mdc_stats = stats['mds_cache']
191 pq_stats = stats['purge_queue']
192 if elapsed >= purge_timeout:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
194
195 num_strays = mdc_stats['num_strays']
196 num_strays_purging = pq_stats['pq_executing']
197 num_purge_ops = pq_stats['pq_executing_ops']
198 files_high_water = pq_stats['pq_executing_high_water']
199 ops_high_water = pq_stats['pq_executing_ops_high_water']
200
201 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water])
202
203 total_strays_created = mdc_stats['strays_created']
204 total_strays_purged = pq_stats['pq_executed']
205
206 if total_strays_purged == total_inodes:
207 log.info("Complete purge in {0} seconds".format(elapsed))
208 break
209 elif total_strays_purged > total_inodes:
210 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
211 else:
212 if throttle_type == self.OPS_THROTTLE:
213 # 11 is filer_max_purge_ops plus one for the backtrace:
214 # limit is allowed to be overshot by this much.
215 if num_purge_ops > mds_max_purge_ops + 11:
216 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
217 num_purge_ops, mds_max_purge_ops
218 ))
219 elif throttle_type == self.FILES_THROTTLE:
220 if num_strays_purging > mds_max_purge_files:
221 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
222 num_strays_purging, mds_max_purge_files
223 ))
224 else:
225 raise NotImplementedError(throttle_type)
226
227 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
228 num_strays_purging, num_strays,
229 total_strays_purged, total_strays_created
230 ))
231 time.sleep(1)
232 elapsed += 1
233
234 background_thread.join()
235
236 # Check that we got up to a respectable rate during the purge. This is totally
237 # racy, but should be safeish unless the cluster is pathologically slow, or
238 # insanely fast such that the deletions all pass before we have polled the
239 # statistics.
240 if throttle_type == self.OPS_THROTTLE:
241 if ops_high_water < mds_max_purge_ops // 2:
242 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
243 ops_high_water, mds_max_purge_ops
244 ))
245 # The MDS may go over mds_max_purge_ops for some items, like a
246 # heavily fragmented directory. The throttle does not kick in
247 # until *after* we reach or exceed the limit. This is expected
248 # because we don't want to starve the PQ or never purge a
249 # particularly large file/directory.
250 self.assertLessEqual(ops_high_water, mds_max_purge_ops+64)
251 elif throttle_type == self.FILES_THROTTLE:
252 if files_high_water < mds_max_purge_files // 2:
253 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
254 files_high_water, mds_max_purge_files
255 ))
256 self.assertLessEqual(files_high_water, mds_max_purge_files)
257
258 # Sanity check all MDC stray stats
259 stats = self.fs.mds_asok(['perf', 'dump'])
260 mdc_stats = stats['mds_cache']
261 pq_stats = stats['purge_queue']
262 self.assertEqual(mdc_stats['num_strays'], 0)
263 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
264 self.assertEqual(pq_stats['pq_executing'], 0)
265 self.assertEqual(pq_stats['pq_executing_ops'], 0)
266 self.assertEqual(mdc_stats['strays_created'], total_inodes)
267 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
268 self.assertEqual(pq_stats['pq_executed'], total_inodes)
269
270 def get_mdc_stat(self, name, mds_id=None):
271 return self.get_stat("mds_cache", name, mds_id)
272
273 def get_stat(self, subsys, name, mds_id=None):
274 return self.fs.mds_asok(['perf', 'dump', subsys, name],
275 mds_id=mds_id)[subsys][name]
276
277 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
278 mds_id=None):
279 self.wait_until_equal(
280 lambda: self.get_stat(subsys, counter, mds_id),
281 expect_val=expect_val, timeout=timeout,
282 reject_fn=lambda x: x > expect_val
283 )
284
285 def test_open_inode(self):
286 """
287 That the case of a dentry unlinked while a client holds an
288 inode open is handled correctly.
289
290 The inode should be moved into a stray dentry, while the original
291 dentry and directory should be purged.
292
293 The inode's data should be purged when the client eventually closes
294 it.
295 """
296 mount_a_client_id = self.mount_a.get_global_id()
297
298 # Write some bytes to a file
299 size_mb = 8
300
301 # Hold the file open
302 p = self.mount_a.open_background("open_file")
303 self.mount_a.write_n_mb("open_file", size_mb)
304 open_file_ino = self.mount_a.path_to_ino("open_file")
305
306 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
307
308 # Unlink the dentry
309 self.mount_a.run_shell(["rm", "-f", "open_file"])
310
311 # Wait to see the stray count increment
312 self.wait_until_equal(
313 lambda: self.get_mdc_stat("num_strays"),
314 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
315
316 # See that while the stray count has incremented, none have passed
317 # on to the purge queue
318 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
319 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
320
321 # See that the client still holds 2 caps
322 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
323
324 # See that the data objects remain in the data pool
325 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
326
327 # Now close the file
328 self.mount_a.kill_background(p)
329
330 # Wait to see the client cap count decrement
331 self.wait_until_equal(
332 lambda: self.get_session(mount_a_client_id)['num_caps'],
333 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
334 )
335 # Wait to see the purge counter increment, stray count go to zero
336 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
337 self.wait_until_equal(
338 lambda: self.get_mdc_stat("num_strays"),
339 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
340 )
341 self._wait_for_counter("purge_queue", "pq_executed", 1)
342
343 # See that the data objects no longer exist
344 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
345
346 self.await_data_pool_empty()
347
348 def test_reintegration_limit(self):
349 """
350 That the reintegration is not blocked by full directories.
351 """
352
353 LOW_LIMIT = 50
354 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
355 time.sleep(10) # for config to reach MDS; async create is fast!!
356
357 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
358 self.mount_a.run_shell_payload("""
359 mkdir a b
360 for i in `seq 1 50`; do
361 touch a/"$i"
362 ln a/"$i" b/"$i"
363 done
364 sync -f a b
365 rm a/*
366 """)
367
368 self.wait_until_equal(
369 lambda: self.get_mdc_stat("num_strays"),
370 expect_val=0,
371 timeout=60
372 )
373 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
374 self.assertGreater(curr_reintegrated, last_reintegrated)
375
376
377 def test_hardlink_reintegration(self):
378 """
379 That removal of primary dentry of hardlinked inode results
380 in reintegration of inode into the previously-remote dentry,
381 rather than lingering as a stray indefinitely.
382 """
383 # Write some bytes to file_a
384 size_mb = 8
385 self.mount_a.run_shell(["mkdir", "dir_1"])
386 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
387 ino = self.mount_a.path_to_ino("dir_1/file_a")
388
389 # Create a hardlink named file_b
390 self.mount_a.run_shell(["mkdir", "dir_2"])
391 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
392 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
393
394 # Flush journal
395 self.fs.mds_asok(['flush', 'journal'])
396
397 # See that backtrace for the file points to the file_a path
398 pre_unlink_bt = self.fs.read_backtrace(ino)
399 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
400
401 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
402 self.mount_a.umount_wait()
403 self.fs.mds_asok(['flush', 'journal'])
404 self.fs.mds_fail_restart()
405 self.fs.wait_for_daemons()
406 self.mount_a.mount_wait()
407
408 # Unlink file_a
409 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
410
411 # See that a stray was created
412 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
413 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
414
415 # Wait, see that data objects are still present (i.e. that the
416 # stray did not advance to purging given time)
417 time.sleep(30)
418 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
419 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
420
421 # See that before reintegration, the inode's backtrace points to a stray dir
422 self.fs.mds_asok(['flush', 'journal'])
423 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
424
425 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
426
427 # Do a metadata operation on the remaining link (mv is heavy handed, but
428 # others like touch may be satisfied from caps without poking MDS)
429 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
430
431 # Stray reintegration should happen as a result of the eval_remote call
432 # on responding to a client request.
433 self.wait_until_equal(
434 lambda: self.get_mdc_stat("num_strays"),
435 expect_val=0,
436 timeout=60
437 )
438
439 # See the reintegration counter increment
440 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
441 self.assertGreater(curr_reintegrated, last_reintegrated)
442 last_reintegrated = curr_reintegrated
443
444 # Flush the journal
445 self.fs.mds_asok(['flush', 'journal'])
446
447 # See that the backtrace for the file points to the remaining link's path
448 post_reint_bt = self.fs.read_backtrace(ino)
449 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
450
451 # mds should reintegrates stray when unlink finishes
452 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
453 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
454
455 # Stray reintegration should happen as a result of the notify_stray call
456 # on completion of unlink
457 self.wait_until_equal(
458 lambda: self.get_mdc_stat("num_strays"),
459 expect_val=0,
460 timeout=60
461 )
462
463 # See the reintegration counter increment
464 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
465 self.assertGreater(curr_reintegrated, last_reintegrated)
466 last_reintegrated = curr_reintegrated
467
468 # Flush the journal
469 self.fs.mds_asok(['flush', 'journal'])
470
471 # See that the backtrace for the file points to the newest link's path
472 post_reint_bt = self.fs.read_backtrace(ino)
473 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
474
475 # Now really delete it
476 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
477 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
478 self._wait_for_counter("purge_queue", "pq_executed", 1)
479
480 self.assert_purge_idle()
481 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
482
483 # We caused the inode to go stray 3 times
484 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
485 # We purged it at the last
486 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
487
488 def test_reintegration_via_scrub(self):
489 """
490 That reintegration is triggered via recursive scrub.
491 """
492
493 self.mount_a.run_shell_payload("""
494 mkdir -p a b
495 for i in `seq 1 50`; do
496 touch a/"$i"
497 ln a/"$i" b/"$i"
498 done
499 sync -f .
500 """)
501
502 self.mount_a.remount() # drop caps/cache
503 self.fs.rank_tell(["flush", "journal"])
504 self.fs.rank_fail()
505 self.fs.wait_for_daemons()
506
507 # only / in cache, reintegration cannot happen
508 self.wait_until_equal(
509 lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
510 expect_val=3,
511 timeout=60
512 )
513
514 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
515 self.mount_a.run_shell_payload("""
516 rm a/*
517 sync -f .
518 """)
519 self.wait_until_equal(
520 lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
521 expect_val=3,
522 timeout=60
523 )
524 self.assertEqual(self.get_mdc_stat("num_strays"), 50)
525 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
526 self.assertEqual(last_reintegrated, curr_reintegrated)
527
528 self.fs.rank_tell(["scrub", "start", "/", "recursive,force"])
529
530 self.wait_until_equal(
531 lambda: self.get_mdc_stat("num_strays"),
532 expect_val=0,
533 timeout=60
534 )
535 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
536 # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths
537 self.assertGreaterEqual(curr_reintegrated, last_reintegrated+50)
538
539 def test_mv_hardlink_cleanup(self):
540 """
541 That when doing a rename from A to B, and B has hardlinks,
542 then we make a stray for B which is then reintegrated
543 into one of his hardlinks.
544 """
545 # Create file_a, file_b, and a hardlink to file_b
546 size_mb = 8
547 self.mount_a.write_n_mb("file_a", size_mb)
548 file_a_ino = self.mount_a.path_to_ino("file_a")
549
550 self.mount_a.write_n_mb("file_b", size_mb)
551 file_b_ino = self.mount_a.path_to_ino("file_b")
552
553 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
554 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
555
556 # mv file_a file_b
557 self.mount_a.run_shell(["mv", "file_a", "file_b"])
558
559 # Stray reintegration should happen as a result of the notify_stray call on
560 # completion of rename
561 self.wait_until_equal(
562 lambda: self.get_mdc_stat("num_strays"),
563 expect_val=0,
564 timeout=60
565 )
566
567 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
568 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
569
570 # No data objects should have been deleted, as both files still have linkage.
571 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
572 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
573
574 self.fs.mds_asok(['flush', 'journal'])
575
576 post_reint_bt = self.fs.read_backtrace(file_b_ino)
577 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
578
579 def _setup_two_ranks(self):
580 # Set up two MDSs
581 self.fs.set_max_mds(2)
582
583 # See that we have two active MDSs
584 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
585 reject_fn=lambda v: v > 2 or v < 1)
586
587 active_mds_names = self.fs.get_active_names()
588 rank_0_id = active_mds_names[0]
589 rank_1_id = active_mds_names[1]
590 log.info("Ranks 0 and 1 are {0} and {1}".format(
591 rank_0_id, rank_1_id))
592
593 # Get rid of other MDS daemons so that it's easier to know which
594 # daemons to expect in which ranks after restarts
595 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
596 self.mds_cluster.mds_stop(unneeded_mds)
597 self.mds_cluster.mds_fail(unneeded_mds)
598
599 return rank_0_id, rank_1_id
600
601 def _force_migrate(self, path, rank=1):
602 """
603 :param to_id: MDS id to move it to
604 :param path: Filesystem path (string) to move
605 :return: None
606 """
607 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank), path])
608 rpath = "/"+path
609 self._wait_subtrees([(rpath, rank)], rank=rank, path=rpath)
610
611 def _is_stopped(self, rank):
612 mds_map = self.fs.get_mds_map()
613 return rank not in [i['rank'] for i in mds_map['info'].values()]
614
615 def test_purge_on_shutdown(self):
616 """
617 That when an MDS rank is shut down, its purge queue is
618 drained in the process.
619 """
620 rank_0_id, rank_1_id = self._setup_two_ranks()
621
622 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
623 self.mds_cluster.mds_fail_restart(rank_1_id)
624 self.fs.wait_for_daemons()
625
626 file_count = 5
627
628 self.mount_a.create_n_files("delete_me/file", file_count)
629
630 self._force_migrate("delete_me")
631
632 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
633 self.mount_a.umount_wait()
634
635 # See all the strays go into purge queue
636 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
637 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
638 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
639
640 # See nothing get purged from the purge queue (yet)
641 time.sleep(10)
642 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
643
644 # Shut down rank 1
645 self.fs.set_max_mds(1)
646
647 # It shouldn't proceed past stopping because its still not allowed
648 # to purge
649 time.sleep(10)
650 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
651 self.assertFalse(self._is_stopped(1))
652
653 # Permit the daemon to start purging again
654 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
655 'injectargs',
656 "--mds_max_purge_files 100")
657
658 # It should now proceed through shutdown
659 self.fs.wait_for_daemons(timeout=120)
660
661 # ...and in the process purge all that data
662 self.await_data_pool_empty()
663
664 def test_migration_on_shutdown(self):
665 """
666 That when an MDS rank is shut down, any non-purgeable strays
667 get migrated to another rank.
668 """
669
670 rank_0_id, rank_1_id = self._setup_two_ranks()
671
672 # Create a non-purgeable stray in a ~mds1 stray directory
673 # by doing a hard link and deleting the original file
674 self.mount_a.run_shell_payload("""
675 mkdir dir_1 dir_2
676 touch dir_1/original
677 ln dir_1/original dir_2/linkto
678 """)
679
680 self._force_migrate("dir_1")
681 self._force_migrate("dir_2", rank=0)
682
683 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
684 self.mount_a.umount_wait()
685 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
686 self.fs.mds_asok(['cache', 'drop'], rank_1_id)
687
688 self.mount_a.mount_wait()
689 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
690 self.mount_a.umount_wait()
691
692 self._wait_for_counter("mds_cache", "strays_created", 1,
693 mds_id=rank_1_id)
694
695 # Shut down rank 1
696 self.fs.set_max_mds(1)
697 self.fs.wait_for_daemons(timeout=120)
698
699 # See that the stray counter on rank 0 has incremented
700 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
701
702 def test_migrate_unlinked_dir(self):
703 """
704 Reproduce https://tracker.ceph.com/issues/53597
705 """
706 rank_0_id, rank_1_id = self._setup_two_ranks()
707
708 self.mount_a.run_shell_payload("""
709 mkdir pin
710 touch pin/placeholder
711 """)
712
713 self._force_migrate("pin")
714
715 # Hold the dir open so it cannot be purged
716 p = self.mount_a.open_dir_background("pin/to-be-unlinked")
717
718 # Unlink the dentry
719 self.mount_a.run_shell(["rmdir", "pin/to-be-unlinked"])
720
721 # Wait to see the stray count increment
722 self.wait_until_equal(
723 lambda: self.get_mdc_stat("num_strays", mds_id=rank_1_id),
724 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
725 # but not purged
726 self.assertEqual(self.get_mdc_stat("strays_created", mds_id=rank_1_id), 1)
727 self.assertEqual(self.get_mdc_stat("strays_enqueued", mds_id=rank_1_id), 0)
728
729 # Test loading unlinked dir into cache
730 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
731 self.fs.mds_asok(['cache', 'drop'], rank_1_id)
732
733 # Shut down rank 1
734 self.fs.set_max_mds(1)
735 self.fs.wait_for_daemons(timeout=120)
736 # Now the stray should be migrated to rank 0
737 # self.assertEqual(self.get_mdc_stat("strays_created", mds_id=rank_0_id), 1)
738 # https://github.com/ceph/ceph/pull/44335#issuecomment-1125940158
739
740 self.mount_a.kill_background(p)
741
742 def assert_backtrace(self, ino, expected_path):
743 """
744 Assert that the backtrace in the data pool for an inode matches
745 an expected /foo/bar path.
746 """
747 expected_elements = expected_path.strip("/").split("/")
748 bt = self.fs.read_backtrace(ino)
749 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
750 self.assertListEqual(expected_elements, actual_elements)
751
752 def get_backtrace_path(self, ino):
753 bt = self.fs.read_backtrace(ino)
754 elements = reversed([dn['dname'] for dn in bt['ancestors']])
755 return "/".join(elements)
756
757 def assert_purge_idle(self):
758 """
759 Assert that the MDS perf counters indicate no strays exist and
760 no ongoing purge activity. Sanity check for when PurgeQueue should
761 be idle.
762 """
763 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
764 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
765 self.assertEqual(mdc_stats["num_strays"], 0)
766 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
767 self.assertEqual(pq_stats["pq_executing"], 0)
768 self.assertEqual(pq_stats["pq_executing_ops"], 0)
769
770 def test_mv_cleanup(self):
771 """
772 That when doing a rename from A to B, and B has no hardlinks,
773 then we make a stray for B and purge him.
774 """
775 # Create file_a and file_b, write some to both
776 size_mb = 8
777 self.mount_a.write_n_mb("file_a", size_mb)
778 file_a_ino = self.mount_a.path_to_ino("file_a")
779 self.mount_a.write_n_mb("file_b", size_mb)
780 file_b_ino = self.mount_a.path_to_ino("file_b")
781
782 self.fs.mds_asok(['flush', 'journal'])
783 self.assert_backtrace(file_a_ino, "file_a")
784 self.assert_backtrace(file_b_ino, "file_b")
785
786 # mv file_a file_b
787 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
788
789 # See that stray counter increments
790 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
791 # Wait for purge counter to increment
792 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
793 self._wait_for_counter("purge_queue", "pq_executed", 1)
794
795 self.assert_purge_idle()
796
797 # file_b should have been purged
798 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
799
800 # Backtrace should have updated from file_a to file_b
801 self.fs.mds_asok(['flush', 'journal'])
802 self.assert_backtrace(file_a_ino, "file_b")
803
804 # file_a's data should still exist
805 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
806
807 def _pool_df(self, pool_name):
808 """
809 Return a dict like
810 {
811 "kb_used": 0,
812 "bytes_used": 0,
813 "max_avail": 19630292406,
814 "objects": 0
815 }
816
817 :param pool_name: Which pool (must exist)
818 """
819 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
820 for p in json.loads(out)['pools']:
821 if p['name'] == pool_name:
822 return p['stats']
823
824 raise RuntimeError("Pool '{0}' not found".format(pool_name))
825
826 def await_data_pool_empty(self):
827 self.wait_until_true(
828 lambda: self._pool_df(
829 self.fs.get_data_pool_name()
830 )['objects'] == 0,
831 timeout=60)
832
833 def test_snapshot_remove(self):
834 """
835 That removal of a snapshot that references a now-unlinked file results
836 in purging on the stray for the file.
837 """
838 # Enable snapshots
839 self.fs.set_allow_new_snaps(True)
840
841 # Create a dir with a file in it
842 size_mb = 8
843 self.mount_a.run_shell(["mkdir", "snapdir"])
844 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
845 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
846 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
847
848 # Snapshot the dir
849 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
850
851 # Cause the head revision to deviate from the snapshot
852 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
853
854 # Flush the journal so that backtraces, dirfrag objects will actually be written
855 self.fs.mds_asok(["flush", "journal"])
856
857 # Unlink the file
858 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
859 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
860
861 # Unmount the client because when I come back to check the data is still
862 # in the file I don't want to just see what's in the page cache.
863 self.mount_a.umount_wait()
864
865 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
866
867 # FIXME: at this stage we see a purge and the stray count drops to
868 # zero, but there's actually still a stray, so at the very
869 # least the StrayManager stats code is slightly off
870
871 self.mount_a.mount_wait()
872
873 # See that the data from the snapshotted revision of the file is still present
874 # and correct
875 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
876
877 # Remove the snapshot
878 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
879
880 # Purging file_a doesn't happen until after we've flushed the journal, because
881 # it is referenced by the snapshotted subdir, and the snapshot isn't really
882 # gone until the journal references to it are gone
883 self.fs.mds_asok(["flush", "journal"])
884
885 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
886 # See also: http://tracker.ceph.com/issues/20072
887 self.wait_until_true(
888 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
889 timeout=60
890 )
891
892 # See that a purge happens now
893 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
894 self._wait_for_counter("purge_queue", "pq_executed", 2)
895
896 self.await_data_pool_empty()
897
898 def test_fancy_layout(self):
899 """
900 purge stray file with fancy layout
901 """
902
903 file_name = "fancy_layout_file"
904 self.mount_a.run_shell(["touch", file_name])
905
906 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
907 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
908
909 # 35MB requires 7 objects
910 size_mb = 35
911 self.mount_a.write_n_mb(file_name, size_mb)
912
913 self.mount_a.run_shell(["rm", "-f", file_name])
914 self.fs.mds_asok(["flush", "journal"])
915
916 # can't use self.fs.data_objects_absent here, it does not support fancy layout
917 self.await_data_pool_empty()
918
919 def test_dirfrag_limit(self):
920 """
921 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
922 """
923
924 LOW_LIMIT = 50
925 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
926 time.sleep(10) # for config to reach MDS; async create is fast!!
927
928 try:
929 self.mount_a.create_n_files("subdir/file", LOW_LIMIT+1, finaldirsync=True)
930 except CommandFailedError:
931 pass # ENOSPC
932 else:
933 self.fail("fragment size exceeded")
934
935
936 def test_dirfrag_limit_fragmented(self):
937 """
938 That fragmentation (forced) will allow more entries to be created.
939 """
940
941 LOW_LIMIT = 50
942 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
943 self.config_set('mds', 'mds_bal_merge_size', 1) # disable merging
944 time.sleep(10) # for config to reach MDS; async create is fast!!
945
946 # Test that we can go beyond the limit if we fragment the directory
947 self.mount_a.create_n_files("subdir/file", LOW_LIMIT, finaldirsync=True)
948 self.mount_a.umount_wait() # release client caps
949
950 # Ensure that subdir is fragmented
951 self.fs.rank_asok(["dirfrag", "split", "/subdir", "0/0", "1"])
952 self.fs.rank_asok(["flush", "journal"])
953
954 # Create 50% more files than the current fragment limit
955 self.mount_a.mount_wait()
956 self.mount_a.create_n_files("subdir/file", (LOW_LIMIT*3)//2, finaldirsync=True)
957
958 def test_dirfrag_limit_strays(self):
959 """
960 That unlinking fails when the stray directory fragment becomes too
961 large and that unlinking may continue once those strays are purged.
962 """
963
964 LOW_LIMIT = 10
965 # N.B. this test is inherently racy because stray removal may be faster
966 # than slow(er) file creation.
967 self.config_set('mds', 'mds_bal_fragment_size_max', LOW_LIMIT)
968 time.sleep(10) # for config to reach MDS; async create is fast!!
969
970 # Now test the stray directory size is limited and recovers
971 strays_before = self.get_mdc_stat("strays_created")
972 try:
973 # 10 stray directories: expect collisions
974 self.mount_a.create_n_files("subdir/file", LOW_LIMIT*10, finaldirsync=True, unlink=True)
975 except CommandFailedError:
976 pass # ENOSPC
977 else:
978 self.fail("fragment size exceeded")
979 strays_after = self.get_mdc_stat("strays_created")
980 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
981
982 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
983 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
984
985 # verify new files can be created and unlinked
986 self.mount_a.create_n_files("subdir/file", LOW_LIMIT, dirsync=True, unlink=True)
987
988 def test_purge_queue_upgrade(self):
989 """
990 That when starting on a system with no purge queue in the metadata
991 pool, we silently create one.
992 :return:
993 """
994
995 self.mds_cluster.mds_stop()
996 self.mds_cluster.mds_fail()
997 self.fs.radosm(["rm", "500.00000000"])
998 self.mds_cluster.mds_restart()
999 self.fs.wait_for_daemons()
1000
1001 def test_replicated_delete_speed(self):
1002 """
1003 That deletions of replicated metadata are not pathologically slow
1004 """
1005 rank_0_id, rank_1_id = self._setup_two_ranks()
1006
1007 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
1008 self.mds_cluster.mds_fail_restart(rank_1_id)
1009 self.fs.wait_for_daemons()
1010
1011 file_count = 10
1012
1013 self.mount_a.create_n_files("delete_me/file", file_count)
1014
1015 self._force_migrate("delete_me")
1016
1017 begin = datetime.datetime.now()
1018 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
1019 end = datetime.datetime.now()
1020
1021 # What we're really checking here is that we are completing client
1022 # operations immediately rather than delaying until the next tick.
1023 tick_period = float(self.fs.get_config("mds_tick_interval",
1024 service_type="mds"))
1025
1026 duration = (end - begin).total_seconds()
1027 self.assertLess(duration, (file_count * tick_period) * 0.25)