]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7
8 from teuthology.exceptions import CommandFailedError
9 from teuthology.orchestra.run import Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12 log = logging.getLogger(__name__)
13
14
15 class TestStrays(CephFSTestCase):
16 MDSS_REQUIRED = 2
17
18 OPS_THROTTLE = 1
19 FILES_THROTTLE = 2
20
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
23
24 @for_teuthology
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
27
28 @for_teuthology
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
31
32 def test_dir_deletion(self):
33 """
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
38 """
39 file_count = 1000
40 create_script = dedent("""
41 import os
42
43 mountpoint = "{mountpoint}"
44 subdir = "delete_me"
45 size = {size}
46 file_count = {file_count}
47 os.mkdir(os.path.join(mountpoint, subdir))
48 for i in range(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
51 f.write(size * 'x')
52 """.format(
53 mountpoint=self.mount_a.mountpoint,
54 size=1024,
55 file_count=file_count
56 ))
57
58 self.mount_a.run_python(create_script)
59
60 # That the dirfrag object is created
61 self.fs.mds_asok(["flush", "journal"])
62 dir_ino = self.mount_a.path_to_ino("delete_me")
63 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
64
65 # Remove everything
66 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
67 self.fs.mds_asok(["flush", "journal"])
68
69 # That all the removed files get created as strays
70 strays = self.get_mdc_stat("strays_created")
71 self.assertEqual(strays, file_count + 1)
72
73 # That the strays all get enqueued for purge
74 self.wait_until_equal(
75 lambda: self.get_mdc_stat("strays_enqueued"),
76 strays,
77 timeout=600
78
79 )
80
81 # That all the purge operations execute
82 self.wait_until_equal(
83 lambda: self.get_stat("purge_queue", "pq_executed"),
84 strays,
85 timeout=600
86 )
87
88 # That finally, the directory metadata object is gone
89 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
90
91 # That finally, the data objects are all gone
92 self.await_data_pool_empty()
93
94 def _test_throttling(self, throttle_type):
95 self.data_log = []
96 try:
97 return self._do_test_throttling(throttle_type)
98 except:
99 for l in self.data_log:
100 log.info(",".join([l_.__str__() for l_ in l]))
101 raise
102
103 def _do_test_throttling(self, throttle_type):
104 """
105 That the mds_max_purge_ops setting is respected
106 """
107
108 def set_throttles(files, ops):
109 """
110 Helper for updating ops/files limits, and calculating effective
111 ops_per_pg setting to give the same ops limit.
112 """
113 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
114 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
115
116 pgs = self.fs.mon_manager.get_pool_int_property(
117 self.fs.get_data_pool_name(),
118 "pg_num"
119 )
120 ops_per_pg = float(ops) / pgs
121 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
122
123 # Test conditions depend on what we're going to be exercising.
124 # * Lift the threshold on whatever throttle we are *not* testing, so
125 # that the throttle of interest is the one that will be the bottleneck
126 # * Create either many small files (test file count throttling) or fewer
127 # large files (test op throttling)
128 if throttle_type == self.OPS_THROTTLE:
129 set_throttles(files=100000000, ops=16)
130 size_unit = 1024 * 1024 # big files, generate lots of ops
131 file_multiplier = 100
132 elif throttle_type == self.FILES_THROTTLE:
133 # The default value of file limit is pretty permissive, so to avoid
134 # the test running too fast, create lots of files and set the limit
135 # pretty low.
136 set_throttles(ops=100000000, files=6)
137 size_unit = 1024 # small, numerous files
138 file_multiplier = 200
139 else:
140 raise NotImplementedError(throttle_type)
141
142 # Pick up config changes
143 self.fs.mds_fail_restart()
144 self.fs.wait_for_daemons()
145
146 create_script = dedent("""
147 import os
148
149 mountpoint = "{mountpoint}"
150 subdir = "delete_me"
151 size_unit = {size_unit}
152 file_multiplier = {file_multiplier}
153 os.mkdir(os.path.join(mountpoint, subdir))
154 for i in range(0, file_multiplier):
155 for size in range(0, {size_range}*size_unit, size_unit):
156 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
157 with open(os.path.join(mountpoint, subdir, filename), 'w') as f:
158 f.write(size * 'x')
159 """.format(
160 mountpoint=self.mount_a.mountpoint,
161 size_unit=size_unit,
162 file_multiplier=file_multiplier,
163 size_range=self.throttle_workload_size_range
164 ))
165
166 self.mount_a.run_python(create_script)
167
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
170 def background():
171 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
172 self.fs.mds_asok(["flush", "journal"])
173
174 background_thread = gevent.spawn(background)
175
176 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
177 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
179
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
182 # condition).
183 purge_timeout = 600
184 elapsed = 0
185 files_high_water = 0
186 ops_high_water = 0
187
188 while True:
189 stats = self.fs.mds_asok(['perf', 'dump'])
190 mdc_stats = stats['mds_cache']
191 pq_stats = stats['purge_queue']
192 if elapsed >= purge_timeout:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
194
195 num_strays = mdc_stats['num_strays']
196 num_strays_purging = pq_stats['pq_executing']
197 num_purge_ops = pq_stats['pq_executing_ops']
198 files_high_water = pq_stats['pq_executing_high_water']
199 ops_high_water = pq_stats['pq_executing_ops_high_water']
200
201 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water])
202
203 total_strays_created = mdc_stats['strays_created']
204 total_strays_purged = pq_stats['pq_executed']
205
206 if total_strays_purged == total_inodes:
207 log.info("Complete purge in {0} seconds".format(elapsed))
208 break
209 elif total_strays_purged > total_inodes:
210 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
211 else:
212 if throttle_type == self.OPS_THROTTLE:
213 # 11 is filer_max_purge_ops plus one for the backtrace:
214 # limit is allowed to be overshot by this much.
215 if num_purge_ops > mds_max_purge_ops + 11:
216 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
217 num_purge_ops, mds_max_purge_ops
218 ))
219 elif throttle_type == self.FILES_THROTTLE:
220 if num_strays_purging > mds_max_purge_files:
221 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
222 num_strays_purging, mds_max_purge_files
223 ))
224 else:
225 raise NotImplementedError(throttle_type)
226
227 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
228 num_strays_purging, num_strays,
229 total_strays_purged, total_strays_created
230 ))
231 time.sleep(1)
232 elapsed += 1
233
234 background_thread.join()
235
236 # Check that we got up to a respectable rate during the purge. This is totally
237 # racy, but should be safeish unless the cluster is pathologically slow, or
238 # insanely fast such that the deletions all pass before we have polled the
239 # statistics.
240 if throttle_type == self.OPS_THROTTLE:
241 if ops_high_water < mds_max_purge_ops // 2:
242 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
243 ops_high_water, mds_max_purge_ops
244 ))
245 # The MDS may go over mds_max_purge_ops for some items, like a
246 # heavily fragmented directory. The throttle does not kick in
247 # until *after* we reach or exceed the limit. This is expected
248 # because we don't want to starve the PQ or never purge a
249 # particularly large file/directory.
250 self.assertLessEqual(ops_high_water, mds_max_purge_ops+64)
251 elif throttle_type == self.FILES_THROTTLE:
252 if files_high_water < mds_max_purge_files // 2:
253 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
254 files_high_water, mds_max_purge_files
255 ))
256 self.assertLessEqual(files_high_water, mds_max_purge_files)
257
258 # Sanity check all MDC stray stats
259 stats = self.fs.mds_asok(['perf', 'dump'])
260 mdc_stats = stats['mds_cache']
261 pq_stats = stats['purge_queue']
262 self.assertEqual(mdc_stats['num_strays'], 0)
263 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
264 self.assertEqual(pq_stats['pq_executing'], 0)
265 self.assertEqual(pq_stats['pq_executing_ops'], 0)
266 self.assertEqual(mdc_stats['strays_created'], total_inodes)
267 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
268 self.assertEqual(pq_stats['pq_executed'], total_inodes)
269
270 def get_mdc_stat(self, name, mds_id=None):
271 return self.get_stat("mds_cache", name, mds_id)
272
273 def get_stat(self, subsys, name, mds_id=None):
274 return self.fs.mds_asok(['perf', 'dump', subsys, name],
275 mds_id=mds_id)[subsys][name]
276
277 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
278 mds_id=None):
279 self.wait_until_equal(
280 lambda: self.get_stat(subsys, counter, mds_id),
281 expect_val=expect_val, timeout=timeout,
282 reject_fn=lambda x: x > expect_val
283 )
284
285 def test_open_inode(self):
286 """
287 That the case of a dentry unlinked while a client holds an
288 inode open is handled correctly.
289
290 The inode should be moved into a stray dentry, while the original
291 dentry and directory should be purged.
292
293 The inode's data should be purged when the client eventually closes
294 it.
295 """
296 mount_a_client_id = self.mount_a.get_global_id()
297
298 # Write some bytes to a file
299 size_mb = 8
300
301 # Hold the file open
302 p = self.mount_a.open_background("open_file")
303 self.mount_a.write_n_mb("open_file", size_mb)
304 open_file_ino = self.mount_a.path_to_ino("open_file")
305
306 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
307
308 # Unlink the dentry
309 self.mount_a.run_shell(["rm", "-f", "open_file"])
310
311 # Wait to see the stray count increment
312 self.wait_until_equal(
313 lambda: self.get_mdc_stat("num_strays"),
314 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
315
316 # See that while the stray count has incremented, none have passed
317 # on to the purge queue
318 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
319 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
320
321 # See that the client still holds 2 caps
322 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
323
324 # See that the data objects remain in the data pool
325 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
326
327 # Now close the file
328 self.mount_a.kill_background(p)
329
330 # Wait to see the client cap count decrement
331 self.wait_until_equal(
332 lambda: self.get_session(mount_a_client_id)['num_caps'],
333 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
334 )
335 # Wait to see the purge counter increment, stray count go to zero
336 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
337 self.wait_until_equal(
338 lambda: self.get_mdc_stat("num_strays"),
339 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
340 )
341 self._wait_for_counter("purge_queue", "pq_executed", 1)
342
343 # See that the data objects no longer exist
344 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
345
346 self.await_data_pool_empty()
347
348 def test_reintegration_limit(self):
349 """
350 That the reintegration is not blocked by full directories.
351 """
352
353 LOW_LIMIT = 50
354 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
355 time.sleep(10) # for config to reach MDS; async create is fast!!
356
357 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
358 self.mount_a.run_shell_payload("""
359 mkdir a b
360 for i in `seq 1 50`; do
361 touch a/"$i"
362 ln a/"$i" b/"$i"
363 done
364 sync -f a b
365 rm a/*
366 """)
367
368 self.wait_until_equal(
369 lambda: self.get_mdc_stat("num_strays"),
370 expect_val=0,
371 timeout=60
372 )
373 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
374 self.assertGreater(curr_reintegrated, last_reintegrated)
375
376
377 def test_hardlink_reintegration(self):
378 """
379 That removal of primary dentry of hardlinked inode results
380 in reintegration of inode into the previously-remote dentry,
381 rather than lingering as a stray indefinitely.
382 """
383 # Write some bytes to file_a
384 size_mb = 8
385 self.mount_a.run_shell(["mkdir", "dir_1"])
386 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
387 ino = self.mount_a.path_to_ino("dir_1/file_a")
388
389 # Create a hardlink named file_b
390 self.mount_a.run_shell(["mkdir", "dir_2"])
391 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
392 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
393
394 # Flush journal
395 self.fs.mds_asok(['flush', 'journal'])
396
397 # See that backtrace for the file points to the file_a path
398 pre_unlink_bt = self.fs.read_backtrace(ino)
399 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
400
401 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
402 self.mount_a.umount_wait()
403 self.fs.mds_asok(['flush', 'journal'])
404 self.fs.mds_fail_restart()
405 self.fs.wait_for_daemons()
406 self.mount_a.mount_wait()
407
408 # Unlink file_a
409 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
410
411 # See that a stray was created
412 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
413 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
414
415 # Wait, see that data objects are still present (i.e. that the
416 # stray did not advance to purging given time)
417 time.sleep(30)
418 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
419 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
420
421 # See that before reintegration, the inode's backtrace points to a stray dir
422 self.fs.mds_asok(['flush', 'journal'])
423 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
424
425 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
426
427 # Do a metadata operation on the remaining link (mv is heavy handed, but
428 # others like touch may be satisfied from caps without poking MDS)
429 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
430
431 # Stray reintegration should happen as a result of the eval_remote call
432 # on responding to a client request.
433 self.wait_until_equal(
434 lambda: self.get_mdc_stat("num_strays"),
435 expect_val=0,
436 timeout=60
437 )
438
439 # See the reintegration counter increment
440 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
441 self.assertGreater(curr_reintegrated, last_reintegrated)
442 last_reintegrated = curr_reintegrated
443
444 # Flush the journal
445 self.fs.mds_asok(['flush', 'journal'])
446
447 # See that the backtrace for the file points to the remaining link's path
448 post_reint_bt = self.fs.read_backtrace(ino)
449 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
450
451 # mds should reintegrates stray when unlink finishes
452 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
453 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
454
455 # Stray reintegration should happen as a result of the notify_stray call
456 # on completion of unlink
457 self.wait_until_equal(
458 lambda: self.get_mdc_stat("num_strays"),
459 expect_val=0,
460 timeout=60
461 )
462
463 # See the reintegration counter increment
464 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
465 self.assertGreater(curr_reintegrated, last_reintegrated)
466 last_reintegrated = curr_reintegrated
467
468 # Flush the journal
469 self.fs.mds_asok(['flush', 'journal'])
470
471 # See that the backtrace for the file points to the newest link's path
472 post_reint_bt = self.fs.read_backtrace(ino)
473 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
474
475 # Now really delete it
476 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
477 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
478 self._wait_for_counter("purge_queue", "pq_executed", 1)
479
480 self.assert_purge_idle()
481 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
482
483 # We caused the inode to go stray 3 times
484 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
485 # We purged it at the last
486 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
487
488 def test_reintegration_via_scrub(self):
489 """
490 That reintegration is triggered via recursive scrub.
491 """
492
493 self.mount_a.run_shell_payload("""
494 mkdir -p a b
495 for i in `seq 1 50`; do
496 touch a/"$i"
497 ln a/"$i" b/"$i"
498 done
499 sync -f .
500 """)
501
502 self.mount_a.remount() # drop caps/cache
503 self.fs.rank_tell(["flush", "journal"])
504 self.fs.rank_fail()
505 self.fs.wait_for_daemons()
506
507 # only / in cache, reintegration cannot happen
508 self.wait_until_equal(
509 lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
510 expect_val=3,
511 timeout=60
512 )
513
514 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
515 self.mount_a.run_shell_payload("""
516 rm a/*
517 sync -f .
518 """)
519 self.wait_until_equal(
520 lambda: len(self.fs.rank_tell(["dump", "tree", "/"])),
521 expect_val=3,
522 timeout=60
523 )
524 self.assertEqual(self.get_mdc_stat("num_strays"), 50)
525 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
526 self.assertEqual(last_reintegrated, curr_reintegrated)
527
528 self.fs.rank_tell(["scrub", "start", "/", "recursive,force"])
529
530 self.wait_until_equal(
531 lambda: self.get_mdc_stat("num_strays"),
532 expect_val=0,
533 timeout=60
534 )
535 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
536 # N.B.: reintegrate (rename RPC) may be tried multiple times from different code paths
537 self.assertGreaterEqual(curr_reintegrated, last_reintegrated+50)
538
539 def test_mv_hardlink_cleanup(self):
540 """
541 That when doing a rename from A to B, and B has hardlinks,
542 then we make a stray for B which is then reintegrated
543 into one of his hardlinks.
544 """
545 # Create file_a, file_b, and a hardlink to file_b
546 size_mb = 8
547 self.mount_a.write_n_mb("file_a", size_mb)
548 file_a_ino = self.mount_a.path_to_ino("file_a")
549
550 self.mount_a.write_n_mb("file_b", size_mb)
551 file_b_ino = self.mount_a.path_to_ino("file_b")
552
553 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
554 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
555
556 # mv file_a file_b
557 self.mount_a.run_shell(["mv", "file_a", "file_b"])
558
559 # Stray reintegration should happen as a result of the notify_stray call on
560 # completion of rename
561 self.wait_until_equal(
562 lambda: self.get_mdc_stat("num_strays"),
563 expect_val=0,
564 timeout=60
565 )
566
567 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
568 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
569
570 # No data objects should have been deleted, as both files still have linkage.
571 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
572 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
573
574 self.fs.mds_asok(['flush', 'journal'])
575
576 post_reint_bt = self.fs.read_backtrace(file_b_ino)
577 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
578
579 def _setup_two_ranks(self):
580 # Set up two MDSs
581 self.fs.set_max_mds(2)
582
583 # See that we have two active MDSs
584 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
585 reject_fn=lambda v: v > 2 or v < 1)
586
587 active_mds_names = self.fs.get_active_names()
588 rank_0_id = active_mds_names[0]
589 rank_1_id = active_mds_names[1]
590 log.info("Ranks 0 and 1 are {0} and {1}".format(
591 rank_0_id, rank_1_id))
592
593 # Get rid of other MDS daemons so that it's easier to know which
594 # daemons to expect in which ranks after restarts
595 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
596 self.mds_cluster.mds_stop(unneeded_mds)
597 self.mds_cluster.mds_fail(unneeded_mds)
598
599 return rank_0_id, rank_1_id
600
601 def _force_migrate(self, path, rank=1):
602 """
603 :param to_id: MDS id to move it to
604 :param path: Filesystem path (string) to move
605 :param watch_ino: Inode number to look for at destination to confirm move
606 :return: None
607 """
608 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", str(rank), path])
609 rpath = "/"+path
610 self._wait_subtrees([(rpath, rank)], rank=rank, path=rpath)
611
612 def _is_stopped(self, rank):
613 mds_map = self.fs.get_mds_map()
614 return rank not in [i['rank'] for i in mds_map['info'].values()]
615
616 def test_purge_on_shutdown(self):
617 """
618 That when an MDS rank is shut down, its purge queue is
619 drained in the process.
620 """
621 rank_0_id, rank_1_id = self._setup_two_ranks()
622
623 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
624 self.mds_cluster.mds_fail_restart(rank_1_id)
625 self.fs.wait_for_daemons()
626
627 file_count = 5
628
629 self.mount_a.create_n_files("delete_me/file", file_count)
630
631 self._force_migrate("delete_me")
632
633 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
634 self.mount_a.umount_wait()
635
636 # See all the strays go into purge queue
637 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
638 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
639 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
640
641 # See nothing get purged from the purge queue (yet)
642 time.sleep(10)
643 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
644
645 # Shut down rank 1
646 self.fs.set_max_mds(1)
647
648 # It shouldn't proceed past stopping because its still not allowed
649 # to purge
650 time.sleep(10)
651 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
652 self.assertFalse(self._is_stopped(1))
653
654 # Permit the daemon to start purging again
655 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
656 'injectargs',
657 "--mds_max_purge_files 100")
658
659 # It should now proceed through shutdown
660 self.fs.wait_for_daemons(timeout=120)
661
662 # ...and in the process purge all that data
663 self.await_data_pool_empty()
664
665 def test_migration_on_shutdown(self):
666 """
667 That when an MDS rank is shut down, any non-purgeable strays
668 get migrated to another rank.
669 """
670
671 rank_0_id, rank_1_id = self._setup_two_ranks()
672
673 # Create a non-purgeable stray in a ~mds1 stray directory
674 # by doing a hard link and deleting the original file
675 self.mount_a.run_shell_payload("""
676 mkdir dir_1 dir_2
677 touch dir_1/original
678 ln dir_1/original dir_2/linkto
679 """)
680
681 self._force_migrate("dir_1")
682 self._force_migrate("dir_2", rank=0)
683
684 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
685 self.mount_a.umount_wait()
686 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
687 self.fs.mds_asok(['cache', 'drop'], rank_1_id)
688
689 self.mount_a.mount_wait()
690 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
691 self.mount_a.umount_wait()
692
693 self._wait_for_counter("mds_cache", "strays_created", 1,
694 mds_id=rank_1_id)
695
696 # Shut down rank 1
697 self.fs.set_max_mds(1)
698 self.fs.wait_for_daemons(timeout=120)
699
700 # See that the stray counter on rank 0 has incremented
701 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
702
703 def assert_backtrace(self, ino, expected_path):
704 """
705 Assert that the backtrace in the data pool for an inode matches
706 an expected /foo/bar path.
707 """
708 expected_elements = expected_path.strip("/").split("/")
709 bt = self.fs.read_backtrace(ino)
710 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
711 self.assertListEqual(expected_elements, actual_elements)
712
713 def get_backtrace_path(self, ino):
714 bt = self.fs.read_backtrace(ino)
715 elements = reversed([dn['dname'] for dn in bt['ancestors']])
716 return "/".join(elements)
717
718 def assert_purge_idle(self):
719 """
720 Assert that the MDS perf counters indicate no strays exist and
721 no ongoing purge activity. Sanity check for when PurgeQueue should
722 be idle.
723 """
724 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
725 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
726 self.assertEqual(mdc_stats["num_strays"], 0)
727 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
728 self.assertEqual(pq_stats["pq_executing"], 0)
729 self.assertEqual(pq_stats["pq_executing_ops"], 0)
730
731 def test_mv_cleanup(self):
732 """
733 That when doing a rename from A to B, and B has no hardlinks,
734 then we make a stray for B and purge him.
735 """
736 # Create file_a and file_b, write some to both
737 size_mb = 8
738 self.mount_a.write_n_mb("file_a", size_mb)
739 file_a_ino = self.mount_a.path_to_ino("file_a")
740 self.mount_a.write_n_mb("file_b", size_mb)
741 file_b_ino = self.mount_a.path_to_ino("file_b")
742
743 self.fs.mds_asok(['flush', 'journal'])
744 self.assert_backtrace(file_a_ino, "file_a")
745 self.assert_backtrace(file_b_ino, "file_b")
746
747 # mv file_a file_b
748 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
749
750 # See that stray counter increments
751 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
752 # Wait for purge counter to increment
753 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
754 self._wait_for_counter("purge_queue", "pq_executed", 1)
755
756 self.assert_purge_idle()
757
758 # file_b should have been purged
759 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
760
761 # Backtrace should have updated from file_a to file_b
762 self.fs.mds_asok(['flush', 'journal'])
763 self.assert_backtrace(file_a_ino, "file_b")
764
765 # file_a's data should still exist
766 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
767
768 def _pool_df(self, pool_name):
769 """
770 Return a dict like
771 {
772 "kb_used": 0,
773 "bytes_used": 0,
774 "max_avail": 19630292406,
775 "objects": 0
776 }
777
778 :param pool_name: Which pool (must exist)
779 """
780 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
781 for p in json.loads(out)['pools']:
782 if p['name'] == pool_name:
783 return p['stats']
784
785 raise RuntimeError("Pool '{0}' not found".format(pool_name))
786
787 def await_data_pool_empty(self):
788 self.wait_until_true(
789 lambda: self._pool_df(
790 self.fs.get_data_pool_name()
791 )['objects'] == 0,
792 timeout=60)
793
794 def test_snapshot_remove(self):
795 """
796 That removal of a snapshot that references a now-unlinked file results
797 in purging on the stray for the file.
798 """
799 # Enable snapshots
800 self.fs.set_allow_new_snaps(True)
801
802 # Create a dir with a file in it
803 size_mb = 8
804 self.mount_a.run_shell(["mkdir", "snapdir"])
805 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
806 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
807 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
808
809 # Snapshot the dir
810 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
811
812 # Cause the head revision to deviate from the snapshot
813 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
814
815 # Flush the journal so that backtraces, dirfrag objects will actually be written
816 self.fs.mds_asok(["flush", "journal"])
817
818 # Unlink the file
819 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
820 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
821
822 # Unmount the client because when I come back to check the data is still
823 # in the file I don't want to just see what's in the page cache.
824 self.mount_a.umount_wait()
825
826 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
827
828 # FIXME: at this stage we see a purge and the stray count drops to
829 # zero, but there's actually still a stray, so at the very
830 # least the StrayManager stats code is slightly off
831
832 self.mount_a.mount_wait()
833
834 # See that the data from the snapshotted revision of the file is still present
835 # and correct
836 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
837
838 # Remove the snapshot
839 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
840
841 # Purging file_a doesn't happen until after we've flushed the journal, because
842 # it is referenced by the snapshotted subdir, and the snapshot isn't really
843 # gone until the journal references to it are gone
844 self.fs.mds_asok(["flush", "journal"])
845
846 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
847 # See also: http://tracker.ceph.com/issues/20072
848 self.wait_until_true(
849 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
850 timeout=60
851 )
852
853 # See that a purge happens now
854 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
855 self._wait_for_counter("purge_queue", "pq_executed", 2)
856
857 self.await_data_pool_empty()
858
859 def test_fancy_layout(self):
860 """
861 purge stray file with fancy layout
862 """
863
864 file_name = "fancy_layout_file"
865 self.mount_a.run_shell(["touch", file_name])
866
867 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
868 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
869
870 # 35MB requires 7 objects
871 size_mb = 35
872 self.mount_a.write_n_mb(file_name, size_mb)
873
874 self.mount_a.run_shell(["rm", "-f", file_name])
875 self.fs.mds_asok(["flush", "journal"])
876
877 # can't use self.fs.data_objects_absent here, it does not support fancy layout
878 self.await_data_pool_empty()
879
880 def test_dirfrag_limit(self):
881 """
882 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
883 """
884
885 LOW_LIMIT = 50
886 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
887 time.sleep(10) # for config to reach MDS; async create is fast!!
888
889 try:
890 self.mount_a.create_n_files("subdir/file", LOW_LIMIT+1, finaldirsync=True)
891 except CommandFailedError:
892 pass # ENOSPC
893 else:
894 self.fail("fragment size exceeded")
895
896
897 def test_dirfrag_limit_fragmented(self):
898 """
899 That fragmentation (forced) will allow more entries to be created.
900 """
901
902 LOW_LIMIT = 50
903 self.config_set('mds', 'mds_bal_fragment_size_max', str(LOW_LIMIT))
904 self.config_set('mds', 'mds_bal_merge_size', 1) # disable merging
905 time.sleep(10) # for config to reach MDS; async create is fast!!
906
907 # Test that we can go beyond the limit if we fragment the directory
908 self.mount_a.create_n_files("subdir/file", LOW_LIMIT, finaldirsync=True)
909 self.mount_a.umount_wait() # release client caps
910
911 # Ensure that subdir is fragmented
912 self.fs.rank_asok(["dirfrag", "split", "/subdir", "0/0", "1"])
913 self.fs.rank_asok(["flush", "journal"])
914
915 # Create 50% more files than the current fragment limit
916 self.mount_a.mount_wait()
917 self.mount_a.create_n_files("subdir/file", (LOW_LIMIT*3)//2, finaldirsync=True)
918
919 def test_dirfrag_limit_strays(self):
920 """
921 That unlinking fails when the stray directory fragment becomes too
922 large and that unlinking may continue once those strays are purged.
923 """
924
925 LOW_LIMIT = 10
926 # N.B. this test is inherently racy because stray removal may be faster
927 # than slow(er) file creation.
928 self.config_set('mds', 'mds_bal_fragment_size_max', LOW_LIMIT)
929 time.sleep(10) # for config to reach MDS; async create is fast!!
930
931 # Now test the stray directory size is limited and recovers
932 strays_before = self.get_mdc_stat("strays_created")
933 try:
934 # 10 stray directories: expect collisions
935 self.mount_a.create_n_files("subdir/file", LOW_LIMIT*10, finaldirsync=True, unlink=True)
936 except CommandFailedError:
937 pass # ENOSPC
938 else:
939 self.fail("fragment size exceeded")
940 strays_after = self.get_mdc_stat("strays_created")
941 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
942
943 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
944 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
945
946 # verify new files can be created and unlinked
947 self.mount_a.create_n_files("subdir/file", LOW_LIMIT, dirsync=True, unlink=True)
948
949 def test_purge_queue_upgrade(self):
950 """
951 That when starting on a system with no purge queue in the metadata
952 pool, we silently create one.
953 :return:
954 """
955
956 self.mds_cluster.mds_stop()
957 self.mds_cluster.mds_fail()
958 self.fs.radosm(["rm", "500.00000000"])
959 self.mds_cluster.mds_restart()
960 self.fs.wait_for_daemons()
961
962 def test_replicated_delete_speed(self):
963 """
964 That deletions of replicated metadata are not pathologically slow
965 """
966 rank_0_id, rank_1_id = self._setup_two_ranks()
967
968 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
969 self.mds_cluster.mds_fail_restart(rank_1_id)
970 self.fs.wait_for_daemons()
971
972 file_count = 10
973
974 self.mount_a.create_n_files("delete_me/file", file_count)
975
976 self._force_migrate("delete_me")
977
978 begin = datetime.datetime.now()
979 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
980 end = datetime.datetime.now()
981
982 # What we're really checking here is that we are completing client
983 # operations immediately rather than delaying until the next tick.
984 tick_period = float(self.fs.get_config("mds_tick_interval",
985 service_type="mds"))
986
987 duration = (end - begin).total_seconds()
988 self.assertLess(duration, (file_count * tick_period) * 0.25)