]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7 from teuthology.orchestra.run import CommandFailedError, Raw
8 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
9
10 log = logging.getLogger(__name__)
11
12
13 class TestStrays(CephFSTestCase):
14 MDSS_REQUIRED = 2
15
16 OPS_THROTTLE = 1
17 FILES_THROTTLE = 2
18
19 # Range of different file sizes used in throttle test's workload
20 throttle_workload_size_range = 16
21
22 @for_teuthology
23 def test_ops_throttle(self):
24 self._test_throttling(self.OPS_THROTTLE)
25
26 @for_teuthology
27 def test_files_throttle(self):
28 self._test_throttling(self.FILES_THROTTLE)
29
30 def test_dir_deletion(self):
31 """
32 That when deleting a bunch of dentries and the containing
33 directory, everything gets purged.
34 Catches cases where the client might e.g. fail to trim
35 the unlinked dir from its cache.
36 """
37 file_count = 1000
38 create_script = dedent("""
39 import os
40
41 mount_path = "{mount_path}"
42 subdir = "delete_me"
43 size = {size}
44 file_count = {file_count}
45 os.mkdir(os.path.join(mount_path, subdir))
46 for i in xrange(0, file_count):
47 filename = "{{0}}_{{1}}.bin".format(i, size)
48 f = open(os.path.join(mount_path, subdir, filename), 'w')
49 f.write(size * 'x')
50 f.close()
51 """.format(
52 mount_path=self.mount_a.mountpoint,
53 size=1024,
54 file_count=file_count
55 ))
56
57 self.mount_a.run_python(create_script)
58
59 # That the dirfrag object is created
60 self.fs.mds_asok(["flush", "journal"])
61 dir_ino = self.mount_a.path_to_ino("delete_me")
62 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
63
64 # Remove everything
65 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
66 self.fs.mds_asok(["flush", "journal"])
67
68 # That all the removed files get created as strays
69 strays = self.get_mdc_stat("strays_created")
70 self.assertEqual(strays, file_count + 1)
71
72 # That the strays all get enqueued for purge
73 self.wait_until_equal(
74 lambda: self.get_mdc_stat("strays_enqueued"),
75 strays,
76 timeout=600
77
78 )
79
80 # That all the purge operations execute
81 self.wait_until_equal(
82 lambda: self.get_stat("purge_queue", "pq_executed"),
83 strays,
84 timeout=600
85 )
86
87 # That finally, the directory metadata object is gone
88 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
89
90 # That finally, the data objects are all gone
91 self.await_data_pool_empty()
92
93 def _test_throttling(self, throttle_type):
94 self.data_log = []
95 try:
96 return self._do_test_throttling(throttle_type)
97 except:
98 for l in self.data_log:
99 log.info(",".join([l_.__str__() for l_ in l]))
100 raise
101
102 def _do_test_throttling(self, throttle_type):
103 """
104 That the mds_max_purge_ops setting is respected
105 """
106
107 def set_throttles(files, ops):
108 """
109 Helper for updating ops/files limits, and calculating effective
110 ops_per_pg setting to give the same ops limit.
111 """
112 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
113 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
114
115 pgs = self.fs.mon_manager.get_pool_property(
116 self.fs.get_data_pool_name(),
117 "pg_num"
118 )
119 ops_per_pg = float(ops) / pgs
120 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
121
122 # Test conditions depend on what we're going to be exercising.
123 # * Lift the threshold on whatever throttle we are *not* testing, so
124 # that the throttle of interest is the one that will be the bottleneck
125 # * Create either many small files (test file count throttling) or fewer
126 # large files (test op throttling)
127 if throttle_type == self.OPS_THROTTLE:
128 set_throttles(files=100000000, ops=16)
129 size_unit = 1024 * 1024 # big files, generate lots of ops
130 file_multiplier = 100
131 elif throttle_type == self.FILES_THROTTLE:
132 # The default value of file limit is pretty permissive, so to avoid
133 # the test running too fast, create lots of files and set the limit
134 # pretty low.
135 set_throttles(ops=100000000, files=6)
136 size_unit = 1024 # small, numerous files
137 file_multiplier = 200
138 else:
139 raise NotImplemented(throttle_type)
140
141 # Pick up config changes
142 self.fs.mds_fail_restart()
143 self.fs.wait_for_daemons()
144
145 create_script = dedent("""
146 import os
147
148 mount_path = "{mount_path}"
149 subdir = "delete_me"
150 size_unit = {size_unit}
151 file_multiplier = {file_multiplier}
152 os.mkdir(os.path.join(mount_path, subdir))
153 for i in xrange(0, file_multiplier):
154 for size in xrange(0, {size_range}*size_unit, size_unit):
155 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
156 f = open(os.path.join(mount_path, subdir, filename), 'w')
157 f.write(size * 'x')
158 f.close()
159 """.format(
160 mount_path=self.mount_a.mountpoint,
161 size_unit=size_unit,
162 file_multiplier=file_multiplier,
163 size_range=self.throttle_workload_size_range
164 ))
165
166 self.mount_a.run_python(create_script)
167
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
170 def background():
171 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
172 self.fs.mds_asok(["flush", "journal"])
173
174 background_thread = gevent.spawn(background)
175
176 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
177 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
179
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
182 # condition).
183 purge_timeout = 600
184 elapsed = 0
185 files_high_water = 0
186 ops_high_water = 0
187
188 while True:
189 stats = self.fs.mds_asok(['perf', 'dump'])
190 mdc_stats = stats['mds_cache']
191 pq_stats = stats['purge_queue']
192 if elapsed >= purge_timeout:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
194
195 num_strays = mdc_stats['num_strays']
196 num_strays_purging = pq_stats['pq_executing']
197 num_purge_ops = pq_stats['pq_executing_ops']
198
199 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
200
201 files_high_water = max(files_high_water, num_strays_purging)
202 ops_high_water = max(ops_high_water, num_purge_ops)
203
204 total_strays_created = mdc_stats['strays_created']
205 total_strays_purged = pq_stats['pq_executed']
206
207 if total_strays_purged == total_inodes:
208 log.info("Complete purge in {0} seconds".format(elapsed))
209 break
210 elif total_strays_purged > total_inodes:
211 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
212 else:
213 if throttle_type == self.OPS_THROTTLE:
214 # 11 is filer_max_purge_ops plus one for the backtrace:
215 # limit is allowed to be overshot by this much.
216 if num_purge_ops > mds_max_purge_ops + 11:
217 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
218 num_purge_ops, mds_max_purge_ops
219 ))
220 elif throttle_type == self.FILES_THROTTLE:
221 if num_strays_purging > mds_max_purge_files:
222 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
223 num_strays_purging, mds_max_purge_files
224 ))
225 else:
226 raise NotImplemented(throttle_type)
227
228 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
229 num_strays_purging, num_strays,
230 total_strays_purged, total_strays_created
231 ))
232 time.sleep(1)
233 elapsed += 1
234
235 background_thread.join()
236
237 # Check that we got up to a respectable rate during the purge. This is totally
238 # racy, but should be safeish unless the cluster is pathologically slow, or
239 # insanely fast such that the deletions all pass before we have polled the
240 # statistics.
241 if throttle_type == self.OPS_THROTTLE:
242 if ops_high_water < mds_max_purge_ops / 2:
243 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
244 ops_high_water, mds_max_purge_ops
245 ))
246 elif throttle_type == self.FILES_THROTTLE:
247 if files_high_water < mds_max_purge_files / 2:
248 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
249 ops_high_water, mds_max_purge_files
250 ))
251
252 # Sanity check all MDC stray stats
253 stats = self.fs.mds_asok(['perf', 'dump'])
254 mdc_stats = stats['mds_cache']
255 pq_stats = stats['purge_queue']
256 self.assertEqual(mdc_stats['num_strays'], 0)
257 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
258 self.assertEqual(pq_stats['pq_executing'], 0)
259 self.assertEqual(pq_stats['pq_executing_ops'], 0)
260 self.assertEqual(mdc_stats['strays_created'], total_inodes)
261 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
262 self.assertEqual(pq_stats['pq_executed'], total_inodes)
263
264 def get_mdc_stat(self, name, mds_id=None):
265 return self.get_stat("mds_cache", name, mds_id)
266
267 def get_stat(self, subsys, name, mds_id=None):
268 return self.fs.mds_asok(['perf', 'dump', subsys, name],
269 mds_id=mds_id)[subsys][name]
270
271 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
272 mds_id=None):
273 self.wait_until_equal(
274 lambda: self.get_stat(subsys, counter, mds_id),
275 expect_val=expect_val, timeout=timeout,
276 reject_fn=lambda x: x > expect_val
277 )
278
279 def test_open_inode(self):
280 """
281 That the case of a dentry unlinked while a client holds an
282 inode open is handled correctly.
283
284 The inode should be moved into a stray dentry, while the original
285 dentry and directory should be purged.
286
287 The inode's data should be purged when the client eventually closes
288 it.
289 """
290 mount_a_client_id = self.mount_a.get_global_id()
291
292 # Write some bytes to a file
293 size_mb = 8
294
295 # Hold the file open
296 p = self.mount_a.open_background("open_file")
297 self.mount_a.write_n_mb("open_file", size_mb)
298 open_file_ino = self.mount_a.path_to_ino("open_file")
299
300 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
301
302 # Unlink the dentry
303 self.mount_a.run_shell(["rm", "-f", "open_file"])
304
305 # Wait to see the stray count increment
306 self.wait_until_equal(
307 lambda: self.get_mdc_stat("num_strays"),
308 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
309
310 # See that while the stray count has incremented, none have passed
311 # on to the purge queue
312 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
313 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
314
315 # See that the client still holds 2 caps
316 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
317
318 # See that the data objects remain in the data pool
319 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
320
321 # Now close the file
322 self.mount_a.kill_background(p)
323
324 # Wait to see the client cap count decrement
325 self.wait_until_equal(
326 lambda: self.get_session(mount_a_client_id)['num_caps'],
327 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
328 )
329 # Wait to see the purge counter increment, stray count go to zero
330 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
331 self.wait_until_equal(
332 lambda: self.get_mdc_stat("num_strays"),
333 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
334 )
335 self._wait_for_counter("purge_queue", "pq_executed", 1)
336
337 # See that the data objects no longer exist
338 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
339
340 self.await_data_pool_empty()
341
342 def test_hardlink_reintegration(self):
343 """
344 That removal of primary dentry of hardlinked inode results
345 in reintegration of inode into the previously-remote dentry,
346 rather than lingering as a stray indefinitely.
347 """
348 # Write some bytes to file_a
349 size_mb = 8
350 self.mount_a.write_n_mb("file_a", size_mb)
351 ino = self.mount_a.path_to_ino("file_a")
352
353 # Create a hardlink named file_b
354 self.mount_a.run_shell(["ln", "file_a", "file_b"])
355 self.assertEqual(self.mount_a.path_to_ino("file_b"), ino)
356
357 # Flush journal
358 self.fs.mds_asok(['flush', 'journal'])
359
360 # See that backtrace for the file points to the file_a path
361 pre_unlink_bt = self.fs.read_backtrace(ino)
362 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
363
364 # Unlink file_a
365 self.mount_a.run_shell(["rm", "-f", "file_a"])
366
367 # See that a stray was created
368 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
369 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
370
371 # Wait, see that data objects are still present (i.e. that the
372 # stray did not advance to purging given time)
373 time.sleep(30)
374 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
375 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
376
377 # See that before reintegration, the inode's backtrace points to a stray dir
378 self.fs.mds_asok(['flush', 'journal'])
379 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
380
381 # Do a metadata operation on the remaining link (mv is heavy handed, but
382 # others like touch may be satisfied from caps without poking MDS)
383 self.mount_a.run_shell(["mv", "file_b", "file_c"])
384
385 # See the reintegration counter increment
386 # This should happen as a result of the eval_remote call on
387 # responding to a client request.
388 self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
389
390 # Flush the journal
391 self.fs.mds_asok(['flush', 'journal'])
392
393 # See that the backtrace for the file points to the remaining link's path
394 post_reint_bt = self.fs.read_backtrace(ino)
395 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
396
397 # See that the number of strays in existence is zero
398 self.assertEqual(self.get_mdc_stat("num_strays"), 0)
399
400 # Now really delete it
401 self.mount_a.run_shell(["rm", "-f", "file_c"])
402 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
403 self._wait_for_counter("purge_queue", "pq_executed", 1)
404
405 self.assert_purge_idle()
406 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
407
408 # We caused the inode to go stray twice
409 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
410 # One time we reintegrated it
411 self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 1)
412 # Then the second time we purged it
413 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
414
415 def test_mv_hardlink_cleanup(self):
416 """
417 That when doing a rename from A to B, and B has hardlinks,
418 then we make a stray for B which is then reintegrated
419 into one of his hardlinks.
420 """
421 # Create file_a, file_b, and a hardlink to file_b
422 size_mb = 8
423 self.mount_a.write_n_mb("file_a", size_mb)
424 file_a_ino = self.mount_a.path_to_ino("file_a")
425
426 self.mount_a.write_n_mb("file_b", size_mb)
427 file_b_ino = self.mount_a.path_to_ino("file_b")
428
429 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
430 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
431
432 # mv file_a file_b
433 self.mount_a.run_shell(["mv", "file_a", "file_b"])
434
435 self.fs.mds_asok(['flush', 'journal'])
436
437 # Initially, linkto_b will still be a remote inode pointing to a newly created
438 # stray from when file_b was unlinked due to the 'mv'. No data objects should
439 # have been deleted, as both files still have linkage.
440 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
441 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
442 self.assertTrue(self.get_backtrace_path(file_b_ino).startswith("stray"))
443 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
444 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
445
446 # Trigger reintegration and wait for it to happen
447 self.assertEqual(self.get_mdc_stat("strays_reintegrated"), 0)
448 self.mount_a.run_shell(["mv", "linkto_b", "file_c"])
449 self._wait_for_counter("mds_cache", "strays_reintegrated", 1)
450
451 self.fs.mds_asok(['flush', 'journal'])
452
453 post_reint_bt = self.fs.read_backtrace(file_b_ino)
454 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
455 self.assertEqual(self.get_mdc_stat("num_strays"), 0)
456
457 def _setup_two_ranks(self):
458 # Set up two MDSs
459 self.fs.set_allow_multimds(True)
460 self.fs.set_max_mds(2)
461
462 # See that we have two active MDSs
463 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
464 reject_fn=lambda v: v > 2 or v < 1)
465
466 active_mds_names = self.fs.get_active_names()
467 rank_0_id = active_mds_names[0]
468 rank_1_id = active_mds_names[1]
469 log.info("Ranks 0 and 1 are {0} and {1}".format(
470 rank_0_id, rank_1_id))
471
472 # Get rid of other MDS daemons so that it's easier to know which
473 # daemons to expect in which ranks after restarts
474 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
475 self.mds_cluster.mds_stop(unneeded_mds)
476 self.mds_cluster.mds_fail(unneeded_mds)
477
478 return rank_0_id, rank_1_id
479
480 def _force_migrate(self, from_id, to_id, path, watch_ino):
481 """
482 :param from_id: MDS id currently containing metadata
483 :param to_id: MDS id to move it to
484 :param path: Filesystem path (string) to move
485 :param watch_ino: Inode number to look for at destination to confirm move
486 :return: None
487 """
488 result = self.fs.mds_asok(["export", "dir", path, "1"], from_id)
489 self.assertEqual(result["return_code"], 0)
490
491 # Poll the MDS cache dump to watch for the export completing
492 migrated = False
493 migrate_timeout = 60
494 migrate_elapsed = 0
495 while not migrated:
496 data = self.fs.mds_asok(["dump", "cache"], to_id)
497 for inode_data in data:
498 if inode_data['ino'] == watch_ino:
499 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
500 if inode_data['is_auth'] is True:
501 migrated = True
502 break
503
504 if not migrated:
505 if migrate_elapsed > migrate_timeout:
506 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
507 else:
508 migrate_elapsed += 1
509 time.sleep(1)
510
511 def _is_stopped(self, rank):
512 mds_map = self.fs.get_mds_map()
513 return rank not in [i['rank'] for i in mds_map['info'].values()]
514
515 def test_purge_on_shutdown(self):
516 """
517 That when an MDS rank is shut down, its purge queue is
518 drained in the process.
519 """
520 rank_0_id, rank_1_id = self._setup_two_ranks()
521
522 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
523 self.mds_cluster.mds_fail_restart(rank_1_id)
524 self.fs.wait_for_daemons()
525
526 file_count = 5
527
528 self.mount_a.create_n_files("delete_me/file", file_count)
529
530 self._force_migrate(rank_0_id, rank_1_id, "/delete_me",
531 self.mount_a.path_to_ino("delete_me/file_0"))
532
533 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
534 self.mount_a.umount_wait()
535
536 # See all the strays go into purge queue
537 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
538 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
539 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
540
541 # See nothing get purged from the purge queue (yet)
542 time.sleep(10)
543 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
544
545 # Shut down rank 1
546 self.fs.set_max_mds(1)
547 self.fs.deactivate(1)
548
549 # It shouldn't proceed past stopping because its still not allowed
550 # to purge
551 time.sleep(10)
552 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
553 self.assertFalse(self._is_stopped(1))
554
555 # Permit the daemon to start purging again
556 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
557 'injectargs',
558 "--mds_max_purge_files 100")
559
560 # It should now proceed through shutdown
561 self.wait_until_true(
562 lambda: self._is_stopped(1),
563 timeout=60
564 )
565
566 # ...and in the process purge all that data
567 self.await_data_pool_empty()
568
569 def test_migration_on_shutdown(self):
570 """
571 That when an MDS rank is shut down, any non-purgeable strays
572 get migrated to another rank.
573 """
574
575 rank_0_id, rank_1_id = self._setup_two_ranks()
576
577 # Create a non-purgeable stray in a ~mds1 stray directory
578 # by doing a hard link and deleting the original file
579 self.mount_a.run_shell(["mkdir", "mydir"])
580 self.mount_a.run_shell(["touch", "mydir/original"])
581 self.mount_a.run_shell(["ln", "mydir/original", "mydir/linkto"])
582
583 self._force_migrate(rank_0_id, rank_1_id, "/mydir",
584 self.mount_a.path_to_ino("mydir/original"))
585
586 self.mount_a.run_shell(["rm", "-f", "mydir/original"])
587 self.mount_a.umount_wait()
588
589 self._wait_for_counter("mds_cache", "strays_created", 1,
590 mds_id=rank_1_id)
591
592 # Shut down rank 1
593 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
594 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
595
596 # Wait til we get to a single active MDS mdsmap state
597 self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
598
599 # See that the stray counter on rank 0 has incremented
600 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
601
602 def assert_backtrace(self, ino, expected_path):
603 """
604 Assert that the backtrace in the data pool for an inode matches
605 an expected /foo/bar path.
606 """
607 expected_elements = expected_path.strip("/").split("/")
608 bt = self.fs.read_backtrace(ino)
609 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
610 self.assertListEqual(expected_elements, actual_elements)
611
612 def get_backtrace_path(self, ino):
613 bt = self.fs.read_backtrace(ino)
614 elements = reversed([dn['dname'] for dn in bt['ancestors']])
615 return "/".join(elements)
616
617 def assert_purge_idle(self):
618 """
619 Assert that the MDS perf counters indicate no strays exist and
620 no ongoing purge activity. Sanity check for when PurgeQueue should
621 be idle.
622 """
623 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
624 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
625 self.assertEqual(mdc_stats["num_strays"], 0)
626 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
627 self.assertEqual(pq_stats["pq_executing"], 0)
628 self.assertEqual(pq_stats["pq_executing_ops"], 0)
629
630 def test_mv_cleanup(self):
631 """
632 That when doing a rename from A to B, and B has no hardlinks,
633 then we make a stray for B and purge him.
634 """
635 # Create file_a and file_b, write some to both
636 size_mb = 8
637 self.mount_a.write_n_mb("file_a", size_mb)
638 file_a_ino = self.mount_a.path_to_ino("file_a")
639 self.mount_a.write_n_mb("file_b", size_mb)
640 file_b_ino = self.mount_a.path_to_ino("file_b")
641
642 self.fs.mds_asok(['flush', 'journal'])
643 self.assert_backtrace(file_a_ino, "file_a")
644 self.assert_backtrace(file_b_ino, "file_b")
645
646 # mv file_a file_b
647 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
648
649 # See that stray counter increments
650 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
651 # Wait for purge counter to increment
652 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
653 self._wait_for_counter("purge_queue", "pq_executed", 1)
654
655 self.assert_purge_idle()
656
657 # file_b should have been purged
658 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
659
660 # Backtrace should have updated from file_a to file_b
661 self.fs.mds_asok(['flush', 'journal'])
662 self.assert_backtrace(file_a_ino, "file_b")
663
664 # file_a's data should still exist
665 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
666
667 def _pool_df(self, pool_name):
668 """
669 Return a dict like
670 {
671 "kb_used": 0,
672 "bytes_used": 0,
673 "max_avail": 19630292406,
674 "objects": 0
675 }
676
677 :param pool_name: Which pool (must exist)
678 """
679 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
680 for p in json.loads(out)['pools']:
681 if p['name'] == pool_name:
682 return p['stats']
683
684 raise RuntimeError("Pool '{0}' not found".format(pool_name))
685
686 def await_data_pool_empty(self):
687 self.wait_until_true(
688 lambda: self._pool_df(
689 self.fs.get_data_pool_name()
690 )['objects'] == 0,
691 timeout=60)
692
693 def test_snapshot_remove(self):
694 """
695 That removal of a snapshot that references a now-unlinked file results
696 in purging on the stray for the file.
697 """
698 # Enable snapshots
699 self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
700 "--yes-i-really-mean-it")
701
702 # Create a dir with a file in it
703 size_mb = 8
704 self.mount_a.run_shell(["mkdir", "snapdir"])
705 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
706 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
707 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
708
709 # Snapshot the dir
710 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
711
712 # Cause the head revision to deviate from the snapshot
713 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
714
715 # Flush the journal so that backtraces, dirfrag objects will actually be written
716 self.fs.mds_asok(["flush", "journal"])
717
718 # Unlink the file
719 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
720 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
721
722 # Unmount the client because when I come back to check the data is still
723 # in the file I don't want to just see what's in the page cache.
724 self.mount_a.umount_wait()
725
726 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
727
728 # FIXME: at this stage we see a purge and the stray count drops to
729 # zero, but there's actually still a stray, so at the very
730 # least the StrayManager stats code is slightly off
731
732 self.mount_a.mount()
733
734 # See that the data from the snapshotted revision of the file is still present
735 # and correct
736 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
737
738 # Remove the snapshot
739 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
740 self.mount_a.umount_wait()
741
742 # Purging file_a doesn't happen until after we've flushed the journal, because
743 # it is referenced by the snapshotted subdir, and the snapshot isn't really
744 # gone until the journal references to it are gone
745 self.fs.mds_asok(["flush", "journal"])
746
747 # See that a purge happens now
748 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
749 self._wait_for_counter("purge_queue", "pq_executed", 2)
750
751 self.assertTrue(self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024))
752 self.await_data_pool_empty()
753
754 def test_fancy_layout(self):
755 """
756 purge stray file with fancy layout
757 """
758
759 file_name = "fancy_layout_file"
760 self.mount_a.run_shell(["touch", file_name])
761
762 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
763 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
764
765 # 35MB requires 7 objects
766 size_mb = 35
767 self.mount_a.write_n_mb(file_name, size_mb)
768
769 self.mount_a.run_shell(["rm", "-f", file_name])
770 self.fs.mds_asok(["flush", "journal"])
771
772 # can't use self.fs.data_objects_absent here, it does not support fancy layout
773 self.await_data_pool_empty()
774
775 def test_dirfrag_limit(self):
776 """
777 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
778
779 That fragmentation (forced) will allow more entries to be created.
780
781 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
782 """
783
784 self.fs.set_allow_dirfrags(True)
785
786 LOW_LIMIT = 50
787 for mds in self.fs.get_daemon_names():
788 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
789
790 try:
791 self.mount_a.run_python(dedent("""
792 import os
793 path = os.path.join("{path}", "subdir")
794 os.mkdir(path)
795 for n in range(0, {file_count}):
796 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
797 """.format(
798 path=self.mount_a.mountpoint,
799 file_count=LOW_LIMIT+1
800 )))
801 except CommandFailedError:
802 pass # ENOSPAC
803 else:
804 raise RuntimeError("fragment size exceeded")
805
806 # Now test that we can go beyond the limit if we fragment the directory
807
808 self.mount_a.run_python(dedent("""
809 import os
810 path = os.path.join("{path}", "subdir2")
811 os.mkdir(path)
812 for n in range(0, {file_count}):
813 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
814 dfd = os.open(path, os.O_DIRECTORY)
815 os.fsync(dfd)
816 """.format(
817 path=self.mount_a.mountpoint,
818 file_count=LOW_LIMIT
819 )))
820
821 # Ensure that subdir2 is fragmented
822 mds_id = self.fs.get_active_names()[0]
823 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
824
825 # remount+flush (release client caps)
826 self.mount_a.umount_wait()
827 self.fs.mds_asok(["flush", "journal"], mds_id)
828 self.mount_a.mount()
829 self.mount_a.wait_until_mounted()
830
831 # Create 50% more files than the current fragment limit
832 self.mount_a.run_python(dedent("""
833 import os
834 path = os.path.join("{path}", "subdir2")
835 for n in range({file_count}, ({file_count}*3)//2):
836 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
837 """.format(
838 path=self.mount_a.mountpoint,
839 file_count=LOW_LIMIT
840 )))
841
842 # Now test the stray directory size is limited and recovers
843 strays_before = self.get_mdc_stat("strays_created")
844 try:
845 self.mount_a.run_python(dedent("""
846 import os
847 path = os.path.join("{path}", "subdir3")
848 os.mkdir(path)
849 for n in range({file_count}):
850 fpath = os.path.join(path, "%s" % n)
851 f = open(fpath, 'w')
852 f.write("%s" % n)
853 f.close()
854 os.unlink(fpath)
855 """.format(
856 path=self.mount_a.mountpoint,
857 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
858 )))
859 except CommandFailedError:
860 pass # ENOSPAC
861 else:
862 raise RuntimeError("fragment size exceeded")
863
864 strays_after = self.get_mdc_stat("strays_created")
865 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
866
867 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
868 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
869
870 self.mount_a.run_python(dedent("""
871 import os
872 path = os.path.join("{path}", "subdir4")
873 os.mkdir(path)
874 for n in range({file_count}):
875 fpath = os.path.join(path, "%s" % n)
876 f = open(fpath, 'w')
877 f.write("%s" % n)
878 f.close()
879 os.unlink(fpath)
880 """.format(
881 path=self.mount_a.mountpoint,
882 file_count=LOW_LIMIT
883 )))
884
885 def test_purge_queue_upgrade(self):
886 """
887 That when starting on a system with no purge queue in the metadata
888 pool, we silently create one.
889 :return:
890 """
891
892 self.mds_cluster.mds_stop()
893 self.mds_cluster.mds_fail()
894 self.fs.rados(["rm", "500.00000000"])
895 self.mds_cluster.mds_restart()
896 self.fs.wait_for_daemons()
897
898 def test_purge_queue_op_rate(self):
899 """
900 A busy purge queue is meant to aggregate operations sufficiently
901 that our RADOS ops to the metadata pool are not O(files). Check
902 that that is so.
903 :return:
904 """
905
906 # For low rates of deletion, the rate of metadata ops actually
907 # will be o(files), so to see the desired behaviour we have to give
908 # the system a significant quantity, i.e. an order of magnitude
909 # more than the number of files it will purge at one time.
910
911 max_purge_files = 2
912
913 self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files)
914 self.fs.mds_fail_restart()
915 self.fs.wait_for_daemons()
916
917 phase_1_files = 256
918 phase_2_files = 512
919
920 self.mount_a.run_shell(["mkdir", "phase1"])
921 self.mount_a.create_n_files("phase1/file", phase_1_files)
922
923 self.mount_a.run_shell(["mkdir", "phase2"])
924 self.mount_a.create_n_files("phase2/file", phase_2_files)
925
926 def unlink_and_count_ops(path, expected_deletions):
927 initial_ops = self.get_stat("objecter", "op")
928 initial_pq_executed = self.get_stat("purge_queue", "pq_executed")
929
930 self.mount_a.run_shell(["rm", "-rf", path])
931
932 self._wait_for_counter(
933 "purge_queue", "pq_executed", initial_pq_executed + expected_deletions
934 )
935
936 final_ops = self.get_stat("objecter", "op")
937
938 # Calculation of the *overhead* operations, i.e. do not include
939 # the operations where we actually delete files.
940 return final_ops - initial_ops - expected_deletions
941
942 self.fs.mds_asok(['flush', 'journal'])
943 phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1)
944
945 self.fs.mds_asok(['flush', 'journal'])
946 phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1)
947
948 log.info("Phase 1: {0}".format(phase1_ops))
949 log.info("Phase 2: {0}".format(phase2_ops))
950
951 # The success criterion is that deleting double the number
952 # of files doesn't generate double the number of overhead ops
953 # -- this comparison is a rough approximation of that rule.
954 self.assertTrue(phase2_ops < phase1_ops * 1.25)
955
956 # Finally, check that our activity did include properly quiescing
957 # the queue (i.e. call to Journaler::write_head in the right place),
958 # by restarting the MDS and checking that it doesn't try re-executing
959 # any of the work we did.
960 self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays
961 # hanging around
962 self.fs.mds_fail_restart()
963 self.fs.wait_for_daemons()
964 time.sleep(10)
965 self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0)