]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7 import datetime
8
9 from teuthology.orchestra.run import CommandFailedError, Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12 log = logging.getLogger(__name__)
13
14
15 class TestStrays(CephFSTestCase):
16 MDSS_REQUIRED = 2
17
18 OPS_THROTTLE = 1
19 FILES_THROTTLE = 2
20
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
23
24 @for_teuthology
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
27
28 @for_teuthology
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
31
32 def test_dir_deletion(self):
33 """
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
38 """
39 file_count = 1000
40 create_script = dedent("""
41 import os
42
43 mount_path = "{mount_path}"
44 subdir = "delete_me"
45 size = {size}
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
51 f.write(size * 'x')
52 f.close()
53 """.format(
54 mount_path=self.mount_a.mountpoint,
55 size=1024,
56 file_count=file_count
57 ))
58
59 self.mount_a.run_python(create_script)
60
61 # That the dirfrag object is created
62 self.fs.mds_asok(["flush", "journal"])
63 dir_ino = self.mount_a.path_to_ino("delete_me")
64 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
65
66 # Remove everything
67 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
68 self.fs.mds_asok(["flush", "journal"])
69
70 # That all the removed files get created as strays
71 strays = self.get_mdc_stat("strays_created")
72 self.assertEqual(strays, file_count + 1)
73
74 # That the strays all get enqueued for purge
75 self.wait_until_equal(
76 lambda: self.get_mdc_stat("strays_enqueued"),
77 strays,
78 timeout=600
79
80 )
81
82 # That all the purge operations execute
83 self.wait_until_equal(
84 lambda: self.get_stat("purge_queue", "pq_executed"),
85 strays,
86 timeout=600
87 )
88
89 # That finally, the directory metadata object is gone
90 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
91
92 # That finally, the data objects are all gone
93 self.await_data_pool_empty()
94
95 def _test_throttling(self, throttle_type):
96 self.data_log = []
97 try:
98 return self._do_test_throttling(throttle_type)
99 except:
100 for l in self.data_log:
101 log.info(",".join([l_.__str__() for l_ in l]))
102 raise
103
104 def _do_test_throttling(self, throttle_type):
105 """
106 That the mds_max_purge_ops setting is respected
107 """
108
109 def set_throttles(files, ops):
110 """
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
113 """
114 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
115 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
116
117 pgs = self.fs.mon_manager.get_pool_property(
118 self.fs.get_data_pool_name(),
119 "pg_num"
120 )
121 ops_per_pg = float(ops) / pgs
122 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
123
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type == self.OPS_THROTTLE:
130 set_throttles(files=100000000, ops=16)
131 size_unit = 1024 * 1024 # big files, generate lots of ops
132 file_multiplier = 100
133 elif throttle_type == self.FILES_THROTTLE:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
136 # pretty low.
137 set_throttles(ops=100000000, files=6)
138 size_unit = 1024 # small, numerous files
139 file_multiplier = 200
140 else:
141 raise NotImplemented(throttle_type)
142
143 # Pick up config changes
144 self.fs.mds_fail_restart()
145 self.fs.wait_for_daemons()
146
147 create_script = dedent("""
148 import os
149
150 mount_path = "{mount_path}"
151 subdir = "delete_me"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
159 f.write(size * 'x')
160 f.close()
161 """.format(
162 mount_path=self.mount_a.mountpoint,
163 size_unit=size_unit,
164 file_multiplier=file_multiplier,
165 size_range=self.throttle_workload_size_range
166 ))
167
168 self.mount_a.run_python(create_script)
169
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
172 def background():
173 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
174 self.fs.mds_asok(["flush", "journal"])
175
176 background_thread = gevent.spawn(background)
177
178 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
179 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
181
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
184 # condition).
185 purge_timeout = 600
186 elapsed = 0
187 files_high_water = 0
188 ops_high_water = 0
189
190 while True:
191 stats = self.fs.mds_asok(['perf', 'dump'])
192 mdc_stats = stats['mds_cache']
193 pq_stats = stats['purge_queue']
194 if elapsed >= purge_timeout:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
196
197 num_strays = mdc_stats['num_strays']
198 num_strays_purging = pq_stats['pq_executing']
199 num_purge_ops = pq_stats['pq_executing_ops']
200 files_high_water = pq_stats['pq_executing_high_water']
201 ops_high_water = pq_stats['pq_executing_ops_high_water']
202
203 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water])
204
205 total_strays_created = mdc_stats['strays_created']
206 total_strays_purged = pq_stats['pq_executed']
207
208 if total_strays_purged == total_inodes:
209 log.info("Complete purge in {0} seconds".format(elapsed))
210 break
211 elif total_strays_purged > total_inodes:
212 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
213 else:
214 if throttle_type == self.OPS_THROTTLE:
215 # 11 is filer_max_purge_ops plus one for the backtrace:
216 # limit is allowed to be overshot by this much.
217 if num_purge_ops > mds_max_purge_ops + 11:
218 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
219 num_purge_ops, mds_max_purge_ops
220 ))
221 elif throttle_type == self.FILES_THROTTLE:
222 if num_strays_purging > mds_max_purge_files:
223 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
224 num_strays_purging, mds_max_purge_files
225 ))
226 else:
227 raise NotImplemented(throttle_type)
228
229 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
230 num_strays_purging, num_strays,
231 total_strays_purged, total_strays_created
232 ))
233 time.sleep(1)
234 elapsed += 1
235
236 background_thread.join()
237
238 # Check that we got up to a respectable rate during the purge. This is totally
239 # racy, but should be safeish unless the cluster is pathologically slow, or
240 # insanely fast such that the deletions all pass before we have polled the
241 # statistics.
242 if throttle_type == self.OPS_THROTTLE:
243 if ops_high_water < mds_max_purge_ops / 2:
244 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
245 ops_high_water, mds_max_purge_ops
246 ))
247 # The MDS may go over mds_max_purge_ops for some items, like a
248 # heavily fragmented directory. The throttle does not kick in
249 # until *after* we reach or exceed the limit. This is expected
250 # because we don't want to starve the PQ or never purge a
251 # particularly large file/directory.
252 self.assertLessEqual(ops_high_water, mds_max_purge_ops+64)
253 elif throttle_type == self.FILES_THROTTLE:
254 if files_high_water < mds_max_purge_files / 2:
255 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
256 files_high_water, mds_max_purge_files
257 ))
258 self.assertLessEqual(files_high_water, mds_max_purge_files)
259
260 # Sanity check all MDC stray stats
261 stats = self.fs.mds_asok(['perf', 'dump'])
262 mdc_stats = stats['mds_cache']
263 pq_stats = stats['purge_queue']
264 self.assertEqual(mdc_stats['num_strays'], 0)
265 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
266 self.assertEqual(pq_stats['pq_executing'], 0)
267 self.assertEqual(pq_stats['pq_executing_ops'], 0)
268 self.assertEqual(mdc_stats['strays_created'], total_inodes)
269 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
270 self.assertEqual(pq_stats['pq_executed'], total_inodes)
271
272 def get_mdc_stat(self, name, mds_id=None):
273 return self.get_stat("mds_cache", name, mds_id)
274
275 def get_stat(self, subsys, name, mds_id=None):
276 return self.fs.mds_asok(['perf', 'dump', subsys, name],
277 mds_id=mds_id)[subsys][name]
278
279 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
280 mds_id=None):
281 self.wait_until_equal(
282 lambda: self.get_stat(subsys, counter, mds_id),
283 expect_val=expect_val, timeout=timeout,
284 reject_fn=lambda x: x > expect_val
285 )
286
287 def test_open_inode(self):
288 """
289 That the case of a dentry unlinked while a client holds an
290 inode open is handled correctly.
291
292 The inode should be moved into a stray dentry, while the original
293 dentry and directory should be purged.
294
295 The inode's data should be purged when the client eventually closes
296 it.
297 """
298 mount_a_client_id = self.mount_a.get_global_id()
299
300 # Write some bytes to a file
301 size_mb = 8
302
303 # Hold the file open
304 p = self.mount_a.open_background("open_file")
305 self.mount_a.write_n_mb("open_file", size_mb)
306 open_file_ino = self.mount_a.path_to_ino("open_file")
307
308 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
309
310 # Unlink the dentry
311 self.mount_a.run_shell(["rm", "-f", "open_file"])
312
313 # Wait to see the stray count increment
314 self.wait_until_equal(
315 lambda: self.get_mdc_stat("num_strays"),
316 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
317
318 # See that while the stray count has incremented, none have passed
319 # on to the purge queue
320 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
321 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
322
323 # See that the client still holds 2 caps
324 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
325
326 # See that the data objects remain in the data pool
327 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
328
329 # Now close the file
330 self.mount_a.kill_background(p)
331
332 # Wait to see the client cap count decrement
333 self.wait_until_equal(
334 lambda: self.get_session(mount_a_client_id)['num_caps'],
335 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
336 )
337 # Wait to see the purge counter increment, stray count go to zero
338 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
339 self.wait_until_equal(
340 lambda: self.get_mdc_stat("num_strays"),
341 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
342 )
343 self._wait_for_counter("purge_queue", "pq_executed", 1)
344
345 # See that the data objects no longer exist
346 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
347
348 self.await_data_pool_empty()
349
350 def test_hardlink_reintegration(self):
351 """
352 That removal of primary dentry of hardlinked inode results
353 in reintegration of inode into the previously-remote dentry,
354 rather than lingering as a stray indefinitely.
355 """
356 # Write some bytes to file_a
357 size_mb = 8
358 self.mount_a.run_shell(["mkdir", "dir_1"])
359 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
360 ino = self.mount_a.path_to_ino("dir_1/file_a")
361
362 # Create a hardlink named file_b
363 self.mount_a.run_shell(["mkdir", "dir_2"])
364 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
365 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
366
367 # Flush journal
368 self.fs.mds_asok(['flush', 'journal'])
369
370 # See that backtrace for the file points to the file_a path
371 pre_unlink_bt = self.fs.read_backtrace(ino)
372 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
373
374 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
375 self.mount_a.umount_wait()
376 self.fs.mds_asok(['flush', 'journal'])
377 self.fs.mds_fail_restart()
378 self.fs.wait_for_daemons()
379 self.mount_a.mount()
380
381 # Unlink file_a
382 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
383
384 # See that a stray was created
385 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
386 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
387
388 # Wait, see that data objects are still present (i.e. that the
389 # stray did not advance to purging given time)
390 time.sleep(30)
391 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
392 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
393
394 # See that before reintegration, the inode's backtrace points to a stray dir
395 self.fs.mds_asok(['flush', 'journal'])
396 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
397
398 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
399
400 # Do a metadata operation on the remaining link (mv is heavy handed, but
401 # others like touch may be satisfied from caps without poking MDS)
402 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
403
404 # Stray reintegration should happen as a result of the eval_remote call
405 # on responding to a client request.
406 self.wait_until_equal(
407 lambda: self.get_mdc_stat("num_strays"),
408 expect_val=0,
409 timeout=60
410 )
411
412 # See the reintegration counter increment
413 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
414 self.assertGreater(curr_reintegrated, last_reintegrated)
415 last_reintegrated = curr_reintegrated
416
417 # Flush the journal
418 self.fs.mds_asok(['flush', 'journal'])
419
420 # See that the backtrace for the file points to the remaining link's path
421 post_reint_bt = self.fs.read_backtrace(ino)
422 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
423
424 # mds should reintegrates stray when unlink finishes
425 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
426 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
427
428 # Stray reintegration should happen as a result of the notify_stray call
429 # on completion of unlink
430 self.wait_until_equal(
431 lambda: self.get_mdc_stat("num_strays"),
432 expect_val=0,
433 timeout=60
434 )
435
436 # See the reintegration counter increment
437 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
438 self.assertGreater(curr_reintegrated, last_reintegrated)
439 last_reintegrated = curr_reintegrated
440
441 # Flush the journal
442 self.fs.mds_asok(['flush', 'journal'])
443
444 # See that the backtrace for the file points to the newest link's path
445 post_reint_bt = self.fs.read_backtrace(ino)
446 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
447
448 # Now really delete it
449 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
450 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
451 self._wait_for_counter("purge_queue", "pq_executed", 1)
452
453 self.assert_purge_idle()
454 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
455
456 # We caused the inode to go stray 3 times
457 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
458 # We purged it at the last
459 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
460
461 def test_mv_hardlink_cleanup(self):
462 """
463 That when doing a rename from A to B, and B has hardlinks,
464 then we make a stray for B which is then reintegrated
465 into one of his hardlinks.
466 """
467 # Create file_a, file_b, and a hardlink to file_b
468 size_mb = 8
469 self.mount_a.write_n_mb("file_a", size_mb)
470 file_a_ino = self.mount_a.path_to_ino("file_a")
471
472 self.mount_a.write_n_mb("file_b", size_mb)
473 file_b_ino = self.mount_a.path_to_ino("file_b")
474
475 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
476 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
477
478 # mv file_a file_b
479 self.mount_a.run_shell(["mv", "file_a", "file_b"])
480
481 # Stray reintegration should happen as a result of the notify_stray call on
482 # completion of rename
483 self.wait_until_equal(
484 lambda: self.get_mdc_stat("num_strays"),
485 expect_val=0,
486 timeout=60
487 )
488
489 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
490 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
491
492 # No data objects should have been deleted, as both files still have linkage.
493 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
494 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
495
496 self.fs.mds_asok(['flush', 'journal'])
497
498 post_reint_bt = self.fs.read_backtrace(file_b_ino)
499 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
500
501 def _setup_two_ranks(self):
502 # Set up two MDSs
503 self.fs.set_max_mds(2)
504
505 # See that we have two active MDSs
506 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
507 reject_fn=lambda v: v > 2 or v < 1)
508
509 active_mds_names = self.fs.get_active_names()
510 rank_0_id = active_mds_names[0]
511 rank_1_id = active_mds_names[1]
512 log.info("Ranks 0 and 1 are {0} and {1}".format(
513 rank_0_id, rank_1_id))
514
515 # Get rid of other MDS daemons so that it's easier to know which
516 # daemons to expect in which ranks after restarts
517 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
518 self.mds_cluster.mds_stop(unneeded_mds)
519 self.mds_cluster.mds_fail(unneeded_mds)
520
521 return rank_0_id, rank_1_id
522
523 def _force_migrate(self, to_id, path, watch_ino):
524 """
525 :param to_id: MDS id to move it to
526 :param path: Filesystem path (string) to move
527 :param watch_ino: Inode number to look for at destination to confirm move
528 :return: None
529 """
530 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
531
532 # Poll the MDS cache dump to watch for the export completing
533 migrated = False
534 migrate_timeout = 60
535 migrate_elapsed = 0
536 while not migrated:
537 data = self.fs.mds_asok(["dump", "cache"], to_id)
538 for inode_data in data:
539 if inode_data['ino'] == watch_ino:
540 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
541 if inode_data['is_auth'] is True:
542 migrated = True
543 break
544
545 if not migrated:
546 if migrate_elapsed > migrate_timeout:
547 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
548 else:
549 migrate_elapsed += 1
550 time.sleep(1)
551
552 def _is_stopped(self, rank):
553 mds_map = self.fs.get_mds_map()
554 return rank not in [i['rank'] for i in mds_map['info'].values()]
555
556 def test_purge_on_shutdown(self):
557 """
558 That when an MDS rank is shut down, its purge queue is
559 drained in the process.
560 """
561 rank_0_id, rank_1_id = self._setup_two_ranks()
562
563 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
564 self.mds_cluster.mds_fail_restart(rank_1_id)
565 self.fs.wait_for_daemons()
566
567 file_count = 5
568
569 self.mount_a.create_n_files("delete_me/file", file_count)
570
571 self._force_migrate(rank_1_id, "delete_me",
572 self.mount_a.path_to_ino("delete_me/file_0"))
573
574 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
575 self.mount_a.umount_wait()
576
577 # See all the strays go into purge queue
578 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
579 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
580 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
581
582 # See nothing get purged from the purge queue (yet)
583 time.sleep(10)
584 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
585
586 # Shut down rank 1
587 self.fs.set_max_mds(1)
588
589 # It shouldn't proceed past stopping because its still not allowed
590 # to purge
591 time.sleep(10)
592 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
593 self.assertFalse(self._is_stopped(1))
594
595 # Permit the daemon to start purging again
596 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
597 'injectargs',
598 "--mds_max_purge_files 100")
599
600 # It should now proceed through shutdown
601 self.fs.wait_for_daemons(timeout=120)
602
603 # ...and in the process purge all that data
604 self.await_data_pool_empty()
605
606 def test_migration_on_shutdown(self):
607 """
608 That when an MDS rank is shut down, any non-purgeable strays
609 get migrated to another rank.
610 """
611
612 rank_0_id, rank_1_id = self._setup_two_ranks()
613
614 # Create a non-purgeable stray in a ~mds1 stray directory
615 # by doing a hard link and deleting the original file
616 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
617 self.mount_a.run_shell(["touch", "dir_1/original"])
618 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
619
620 self._force_migrate(rank_1_id, "dir_1",
621 self.mount_a.path_to_ino("dir_1/original"))
622
623 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
624 self.mount_a.umount_wait()
625 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
626 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
627 self.fs.mds_fail_restart()
628 self.fs.wait_for_daemons()
629
630 active_mds_names = self.fs.get_active_names()
631 rank_0_id = active_mds_names[0]
632 rank_1_id = active_mds_names[1]
633
634 self.mount_a.mount()
635
636 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
637 self.mount_a.umount_wait()
638
639 self._wait_for_counter("mds_cache", "strays_created", 1,
640 mds_id=rank_1_id)
641
642 # Shut down rank 1
643 self.fs.set_max_mds(1)
644 self.fs.wait_for_daemons(timeout=120)
645
646 # See that the stray counter on rank 0 has incremented
647 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
648
649 def assert_backtrace(self, ino, expected_path):
650 """
651 Assert that the backtrace in the data pool for an inode matches
652 an expected /foo/bar path.
653 """
654 expected_elements = expected_path.strip("/").split("/")
655 bt = self.fs.read_backtrace(ino)
656 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
657 self.assertListEqual(expected_elements, actual_elements)
658
659 def get_backtrace_path(self, ino):
660 bt = self.fs.read_backtrace(ino)
661 elements = reversed([dn['dname'] for dn in bt['ancestors']])
662 return "/".join(elements)
663
664 def assert_purge_idle(self):
665 """
666 Assert that the MDS perf counters indicate no strays exist and
667 no ongoing purge activity. Sanity check for when PurgeQueue should
668 be idle.
669 """
670 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
671 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
672 self.assertEqual(mdc_stats["num_strays"], 0)
673 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
674 self.assertEqual(pq_stats["pq_executing"], 0)
675 self.assertEqual(pq_stats["pq_executing_ops"], 0)
676
677 def test_mv_cleanup(self):
678 """
679 That when doing a rename from A to B, and B has no hardlinks,
680 then we make a stray for B and purge him.
681 """
682 # Create file_a and file_b, write some to both
683 size_mb = 8
684 self.mount_a.write_n_mb("file_a", size_mb)
685 file_a_ino = self.mount_a.path_to_ino("file_a")
686 self.mount_a.write_n_mb("file_b", size_mb)
687 file_b_ino = self.mount_a.path_to_ino("file_b")
688
689 self.fs.mds_asok(['flush', 'journal'])
690 self.assert_backtrace(file_a_ino, "file_a")
691 self.assert_backtrace(file_b_ino, "file_b")
692
693 # mv file_a file_b
694 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
695
696 # See that stray counter increments
697 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
698 # Wait for purge counter to increment
699 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
700 self._wait_for_counter("purge_queue", "pq_executed", 1)
701
702 self.assert_purge_idle()
703
704 # file_b should have been purged
705 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
706
707 # Backtrace should have updated from file_a to file_b
708 self.fs.mds_asok(['flush', 'journal'])
709 self.assert_backtrace(file_a_ino, "file_b")
710
711 # file_a's data should still exist
712 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
713
714 def _pool_df(self, pool_name):
715 """
716 Return a dict like
717 {
718 "kb_used": 0,
719 "bytes_used": 0,
720 "max_avail": 19630292406,
721 "objects": 0
722 }
723
724 :param pool_name: Which pool (must exist)
725 """
726 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
727 for p in json.loads(out)['pools']:
728 if p['name'] == pool_name:
729 return p['stats']
730
731 raise RuntimeError("Pool '{0}' not found".format(pool_name))
732
733 def await_data_pool_empty(self):
734 self.wait_until_true(
735 lambda: self._pool_df(
736 self.fs.get_data_pool_name()
737 )['objects'] == 0,
738 timeout=60)
739
740 def test_snapshot_remove(self):
741 """
742 That removal of a snapshot that references a now-unlinked file results
743 in purging on the stray for the file.
744 """
745 # Enable snapshots
746 self.fs.set_allow_new_snaps(True)
747
748 # Create a dir with a file in it
749 size_mb = 8
750 self.mount_a.run_shell(["mkdir", "snapdir"])
751 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
752 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
753 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
754
755 # Snapshot the dir
756 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
757
758 # Cause the head revision to deviate from the snapshot
759 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
760
761 # Flush the journal so that backtraces, dirfrag objects will actually be written
762 self.fs.mds_asok(["flush", "journal"])
763
764 # Unlink the file
765 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
766 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
767
768 # Unmount the client because when I come back to check the data is still
769 # in the file I don't want to just see what's in the page cache.
770 self.mount_a.umount_wait()
771
772 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
773
774 # FIXME: at this stage we see a purge and the stray count drops to
775 # zero, but there's actually still a stray, so at the very
776 # least the StrayManager stats code is slightly off
777
778 self.mount_a.mount()
779
780 # See that the data from the snapshotted revision of the file is still present
781 # and correct
782 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
783
784 # Remove the snapshot
785 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
786
787 # Purging file_a doesn't happen until after we've flushed the journal, because
788 # it is referenced by the snapshotted subdir, and the snapshot isn't really
789 # gone until the journal references to it are gone
790 self.fs.mds_asok(["flush", "journal"])
791
792 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
793 # See also: http://tracker.ceph.com/issues/20072
794 self.wait_until_true(
795 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
796 timeout=60
797 )
798
799 # See that a purge happens now
800 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
801 self._wait_for_counter("purge_queue", "pq_executed", 2)
802
803 self.await_data_pool_empty()
804
805 def test_fancy_layout(self):
806 """
807 purge stray file with fancy layout
808 """
809
810 file_name = "fancy_layout_file"
811 self.mount_a.run_shell(["touch", file_name])
812
813 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
814 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
815
816 # 35MB requires 7 objects
817 size_mb = 35
818 self.mount_a.write_n_mb(file_name, size_mb)
819
820 self.mount_a.run_shell(["rm", "-f", file_name])
821 self.fs.mds_asok(["flush", "journal"])
822
823 # can't use self.fs.data_objects_absent here, it does not support fancy layout
824 self.await_data_pool_empty()
825
826 def test_dirfrag_limit(self):
827 """
828 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
829
830 That fragmentation (forced) will allow more entries to be created.
831
832 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
833 """
834
835 LOW_LIMIT = 50
836 for mds in self.fs.get_daemon_names():
837 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
838
839 try:
840 self.mount_a.run_python(dedent("""
841 import os
842 path = os.path.join("{path}", "subdir")
843 os.mkdir(path)
844 for n in range(0, {file_count}):
845 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
846 """.format(
847 path=self.mount_a.mountpoint,
848 file_count=LOW_LIMIT+1
849 )))
850 except CommandFailedError:
851 pass # ENOSPAC
852 else:
853 raise RuntimeError("fragment size exceeded")
854
855 # Now test that we can go beyond the limit if we fragment the directory
856
857 self.mount_a.run_python(dedent("""
858 import os
859 path = os.path.join("{path}", "subdir2")
860 os.mkdir(path)
861 for n in range(0, {file_count}):
862 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
863 dfd = os.open(path, os.O_DIRECTORY)
864 os.fsync(dfd)
865 """.format(
866 path=self.mount_a.mountpoint,
867 file_count=LOW_LIMIT
868 )))
869
870 # Ensure that subdir2 is fragmented
871 mds_id = self.fs.get_active_names()[0]
872 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
873
874 # remount+flush (release client caps)
875 self.mount_a.umount_wait()
876 self.fs.mds_asok(["flush", "journal"], mds_id)
877 self.mount_a.mount()
878 self.mount_a.wait_until_mounted()
879
880 # Create 50% more files than the current fragment limit
881 self.mount_a.run_python(dedent("""
882 import os
883 path = os.path.join("{path}", "subdir2")
884 for n in range({file_count}, ({file_count}*3)//2):
885 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
886 """.format(
887 path=self.mount_a.mountpoint,
888 file_count=LOW_LIMIT
889 )))
890
891 # Now test the stray directory size is limited and recovers
892 strays_before = self.get_mdc_stat("strays_created")
893 try:
894 self.mount_a.run_python(dedent("""
895 import os
896 path = os.path.join("{path}", "subdir3")
897 os.mkdir(path)
898 for n in range({file_count}):
899 fpath = os.path.join(path, "%s" % n)
900 f = open(fpath, 'w')
901 f.write("%s" % n)
902 f.close()
903 os.unlink(fpath)
904 """.format(
905 path=self.mount_a.mountpoint,
906 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
907 )))
908 except CommandFailedError:
909 pass # ENOSPAC
910 else:
911 raise RuntimeError("fragment size exceeded")
912
913 strays_after = self.get_mdc_stat("strays_created")
914 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
915
916 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
917 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
918
919 self.mount_a.run_python(dedent("""
920 import os
921 path = os.path.join("{path}", "subdir4")
922 os.mkdir(path)
923 for n in range({file_count}):
924 fpath = os.path.join(path, "%s" % n)
925 f = open(fpath, 'w')
926 f.write("%s" % n)
927 f.close()
928 os.unlink(fpath)
929 """.format(
930 path=self.mount_a.mountpoint,
931 file_count=LOW_LIMIT
932 )))
933
934 def test_purge_queue_upgrade(self):
935 """
936 That when starting on a system with no purge queue in the metadata
937 pool, we silently create one.
938 :return:
939 """
940
941 self.mds_cluster.mds_stop()
942 self.mds_cluster.mds_fail()
943 self.fs.rados(["rm", "500.00000000"])
944 self.mds_cluster.mds_restart()
945 self.fs.wait_for_daemons()
946
947 def test_replicated_delete_speed(self):
948 """
949 That deletions of replicated metadata are not pathologically slow
950 """
951 rank_0_id, rank_1_id = self._setup_two_ranks()
952
953 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
954 self.mds_cluster.mds_fail_restart(rank_1_id)
955 self.fs.wait_for_daemons()
956
957 file_count = 10
958
959 self.mount_a.create_n_files("delete_me/file", file_count)
960
961 self._force_migrate(rank_1_id, "delete_me",
962 self.mount_a.path_to_ino("delete_me/file_0"))
963
964 begin = datetime.datetime.now()
965 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
966 end = datetime.datetime.now()
967
968 # What we're really checking here is that we are completing client
969 # operations immediately rather than delaying until the next tick.
970 tick_period = float(self.fs.get_config("mds_tick_interval",
971 service_type="mds"))
972
973 duration = (end - begin).total_seconds()
974 self.assertLess(duration, (file_count * tick_period) * 0.25)
975