]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7
8 from teuthology.orchestra.run import CommandFailedError, Raw
9 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
10
11 log = logging.getLogger(__name__)
12
13
14 class TestStrays(CephFSTestCase):
15 MDSS_REQUIRED = 2
16
17 OPS_THROTTLE = 1
18 FILES_THROTTLE = 2
19
20 # Range of different file sizes used in throttle test's workload
21 throttle_workload_size_range = 16
22
23 @for_teuthology
24 def test_ops_throttle(self):
25 self._test_throttling(self.OPS_THROTTLE)
26
27 @for_teuthology
28 def test_files_throttle(self):
29 self._test_throttling(self.FILES_THROTTLE)
30
31 def test_dir_deletion(self):
32 """
33 That when deleting a bunch of dentries and the containing
34 directory, everything gets purged.
35 Catches cases where the client might e.g. fail to trim
36 the unlinked dir from its cache.
37 """
38 file_count = 1000
39 create_script = dedent("""
40 import os
41
42 mount_path = "{mount_path}"
43 subdir = "delete_me"
44 size = {size}
45 file_count = {file_count}
46 os.mkdir(os.path.join(mount_path, subdir))
47 for i in range(0, file_count):
48 filename = "{{0}}_{{1}}.bin".format(i, size)
49 with open(os.path.join(mount_path, subdir, filename), 'w') as f:
50 f.write(size * 'x')
51 """.format(
52 mount_path=self.mount_a.mountpoint,
53 size=1024,
54 file_count=file_count
55 ))
56
57 self.mount_a.run_python(create_script)
58
59 # That the dirfrag object is created
60 self.fs.mds_asok(["flush", "journal"])
61 dir_ino = self.mount_a.path_to_ino("delete_me")
62 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
63
64 # Remove everything
65 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
66 self.fs.mds_asok(["flush", "journal"])
67
68 # That all the removed files get created as strays
69 strays = self.get_mdc_stat("strays_created")
70 self.assertEqual(strays, file_count + 1)
71
72 # That the strays all get enqueued for purge
73 self.wait_until_equal(
74 lambda: self.get_mdc_stat("strays_enqueued"),
75 strays,
76 timeout=600
77
78 )
79
80 # That all the purge operations execute
81 self.wait_until_equal(
82 lambda: self.get_stat("purge_queue", "pq_executed"),
83 strays,
84 timeout=600
85 )
86
87 # That finally, the directory metadata object is gone
88 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
89
90 # That finally, the data objects are all gone
91 self.await_data_pool_empty()
92
93 def _test_throttling(self, throttle_type):
94 self.data_log = []
95 try:
96 return self._do_test_throttling(throttle_type)
97 except:
98 for l in self.data_log:
99 log.info(",".join([l_.__str__() for l_ in l]))
100 raise
101
102 def _do_test_throttling(self, throttle_type):
103 """
104 That the mds_max_purge_ops setting is respected
105 """
106
107 def set_throttles(files, ops):
108 """
109 Helper for updating ops/files limits, and calculating effective
110 ops_per_pg setting to give the same ops limit.
111 """
112 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
113 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
114
115 pgs = self.fs.mon_manager.get_pool_int_property(
116 self.fs.get_data_pool_name(),
117 "pg_num"
118 )
119 ops_per_pg = float(ops) / pgs
120 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
121
122 # Test conditions depend on what we're going to be exercising.
123 # * Lift the threshold on whatever throttle we are *not* testing, so
124 # that the throttle of interest is the one that will be the bottleneck
125 # * Create either many small files (test file count throttling) or fewer
126 # large files (test op throttling)
127 if throttle_type == self.OPS_THROTTLE:
128 set_throttles(files=100000000, ops=16)
129 size_unit = 1024 * 1024 # big files, generate lots of ops
130 file_multiplier = 100
131 elif throttle_type == self.FILES_THROTTLE:
132 # The default value of file limit is pretty permissive, so to avoid
133 # the test running too fast, create lots of files and set the limit
134 # pretty low.
135 set_throttles(ops=100000000, files=6)
136 size_unit = 1024 # small, numerous files
137 file_multiplier = 200
138 else:
139 raise NotImplementedError(throttle_type)
140
141 # Pick up config changes
142 self.fs.mds_fail_restart()
143 self.fs.wait_for_daemons()
144
145 create_script = dedent("""
146 import os
147
148 mount_path = "{mount_path}"
149 subdir = "delete_me"
150 size_unit = {size_unit}
151 file_multiplier = {file_multiplier}
152 os.mkdir(os.path.join(mount_path, subdir))
153 for i in range(0, file_multiplier):
154 for size in range(0, {size_range}*size_unit, size_unit):
155 filename = "{{0}}_{{1}}.bin".format(i, size // size_unit)
156 with open(os.path.join(mount_path, subdir, filename), 'w') as f:
157 f.write(size * 'x')
158 """.format(
159 mount_path=self.mount_a.mountpoint,
160 size_unit=size_unit,
161 file_multiplier=file_multiplier,
162 size_range=self.throttle_workload_size_range
163 ))
164
165 self.mount_a.run_python(create_script)
166
167 # We will run the deletion in the background, to reduce the risk of it completing before
168 # we have started monitoring the stray statistics.
169 def background():
170 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
171 self.fs.mds_asok(["flush", "journal"])
172
173 background_thread = gevent.spawn(background)
174
175 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
176 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
177 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
178
179 # During this phase we look for the concurrent ops to exceed half
180 # the limit (a heuristic) and not exceed the limit (a correctness
181 # condition).
182 purge_timeout = 600
183 elapsed = 0
184 files_high_water = 0
185 ops_high_water = 0
186
187 while True:
188 stats = self.fs.mds_asok(['perf', 'dump'])
189 mdc_stats = stats['mds_cache']
190 pq_stats = stats['purge_queue']
191 if elapsed >= purge_timeout:
192 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
193
194 num_strays = mdc_stats['num_strays']
195 num_strays_purging = pq_stats['pq_executing']
196 num_purge_ops = pq_stats['pq_executing_ops']
197 files_high_water = pq_stats['pq_executing_high_water']
198 ops_high_water = pq_stats['pq_executing_ops_high_water']
199
200 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water])
201
202 total_strays_created = mdc_stats['strays_created']
203 total_strays_purged = pq_stats['pq_executed']
204
205 if total_strays_purged == total_inodes:
206 log.info("Complete purge in {0} seconds".format(elapsed))
207 break
208 elif total_strays_purged > total_inodes:
209 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
210 else:
211 if throttle_type == self.OPS_THROTTLE:
212 # 11 is filer_max_purge_ops plus one for the backtrace:
213 # limit is allowed to be overshot by this much.
214 if num_purge_ops > mds_max_purge_ops + 11:
215 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
216 num_purge_ops, mds_max_purge_ops
217 ))
218 elif throttle_type == self.FILES_THROTTLE:
219 if num_strays_purging > mds_max_purge_files:
220 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
221 num_strays_purging, mds_max_purge_files
222 ))
223 else:
224 raise NotImplementedError(throttle_type)
225
226 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
227 num_strays_purging, num_strays,
228 total_strays_purged, total_strays_created
229 ))
230 time.sleep(1)
231 elapsed += 1
232
233 background_thread.join()
234
235 # Check that we got up to a respectable rate during the purge. This is totally
236 # racy, but should be safeish unless the cluster is pathologically slow, or
237 # insanely fast such that the deletions all pass before we have polled the
238 # statistics.
239 if throttle_type == self.OPS_THROTTLE:
240 if ops_high_water < mds_max_purge_ops // 2:
241 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
242 ops_high_water, mds_max_purge_ops
243 ))
244 # The MDS may go over mds_max_purge_ops for some items, like a
245 # heavily fragmented directory. The throttle does not kick in
246 # until *after* we reach or exceed the limit. This is expected
247 # because we don't want to starve the PQ or never purge a
248 # particularly large file/directory.
249 self.assertLessEqual(ops_high_water, mds_max_purge_ops+64)
250 elif throttle_type == self.FILES_THROTTLE:
251 if files_high_water < mds_max_purge_files // 2:
252 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
253 files_high_water, mds_max_purge_files
254 ))
255 self.assertLessEqual(files_high_water, mds_max_purge_files)
256
257 # Sanity check all MDC stray stats
258 stats = self.fs.mds_asok(['perf', 'dump'])
259 mdc_stats = stats['mds_cache']
260 pq_stats = stats['purge_queue']
261 self.assertEqual(mdc_stats['num_strays'], 0)
262 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
263 self.assertEqual(pq_stats['pq_executing'], 0)
264 self.assertEqual(pq_stats['pq_executing_ops'], 0)
265 self.assertEqual(mdc_stats['strays_created'], total_inodes)
266 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
267 self.assertEqual(pq_stats['pq_executed'], total_inodes)
268
269 def get_mdc_stat(self, name, mds_id=None):
270 return self.get_stat("mds_cache", name, mds_id)
271
272 def get_stat(self, subsys, name, mds_id=None):
273 return self.fs.mds_asok(['perf', 'dump', subsys, name],
274 mds_id=mds_id)[subsys][name]
275
276 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
277 mds_id=None):
278 self.wait_until_equal(
279 lambda: self.get_stat(subsys, counter, mds_id),
280 expect_val=expect_val, timeout=timeout,
281 reject_fn=lambda x: x > expect_val
282 )
283
284 def test_open_inode(self):
285 """
286 That the case of a dentry unlinked while a client holds an
287 inode open is handled correctly.
288
289 The inode should be moved into a stray dentry, while the original
290 dentry and directory should be purged.
291
292 The inode's data should be purged when the client eventually closes
293 it.
294 """
295 mount_a_client_id = self.mount_a.get_global_id()
296
297 # Write some bytes to a file
298 size_mb = 8
299
300 # Hold the file open
301 p = self.mount_a.open_background("open_file")
302 self.mount_a.write_n_mb("open_file", size_mb)
303 open_file_ino = self.mount_a.path_to_ino("open_file")
304
305 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
306
307 # Unlink the dentry
308 self.mount_a.run_shell(["rm", "-f", "open_file"])
309
310 # Wait to see the stray count increment
311 self.wait_until_equal(
312 lambda: self.get_mdc_stat("num_strays"),
313 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
314
315 # See that while the stray count has incremented, none have passed
316 # on to the purge queue
317 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
318 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
319
320 # See that the client still holds 2 caps
321 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
322
323 # See that the data objects remain in the data pool
324 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
325
326 # Now close the file
327 self.mount_a.kill_background(p)
328
329 # Wait to see the client cap count decrement
330 self.wait_until_equal(
331 lambda: self.get_session(mount_a_client_id)['num_caps'],
332 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
333 )
334 # Wait to see the purge counter increment, stray count go to zero
335 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
336 self.wait_until_equal(
337 lambda: self.get_mdc_stat("num_strays"),
338 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
339 )
340 self._wait_for_counter("purge_queue", "pq_executed", 1)
341
342 # See that the data objects no longer exist
343 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
344
345 self.await_data_pool_empty()
346
347 def test_hardlink_reintegration(self):
348 """
349 That removal of primary dentry of hardlinked inode results
350 in reintegration of inode into the previously-remote dentry,
351 rather than lingering as a stray indefinitely.
352 """
353 # Write some bytes to file_a
354 size_mb = 8
355 self.mount_a.run_shell(["mkdir", "dir_1"])
356 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
357 ino = self.mount_a.path_to_ino("dir_1/file_a")
358
359 # Create a hardlink named file_b
360 self.mount_a.run_shell(["mkdir", "dir_2"])
361 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
362 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
363
364 # Flush journal
365 self.fs.mds_asok(['flush', 'journal'])
366
367 # See that backtrace for the file points to the file_a path
368 pre_unlink_bt = self.fs.read_backtrace(ino)
369 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
370
371 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
372 self.mount_a.umount_wait()
373 self.fs.mds_asok(['flush', 'journal'])
374 self.fs.mds_fail_restart()
375 self.fs.wait_for_daemons()
376 self.mount_a.mount_wait()
377
378 # Unlink file_a
379 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
380
381 # See that a stray was created
382 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
383 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
384
385 # Wait, see that data objects are still present (i.e. that the
386 # stray did not advance to purging given time)
387 time.sleep(30)
388 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
389 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
390
391 # See that before reintegration, the inode's backtrace points to a stray dir
392 self.fs.mds_asok(['flush', 'journal'])
393 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
394
395 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
396
397 # Do a metadata operation on the remaining link (mv is heavy handed, but
398 # others like touch may be satisfied from caps without poking MDS)
399 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
400
401 # Stray reintegration should happen as a result of the eval_remote call
402 # on responding to a client request.
403 self.wait_until_equal(
404 lambda: self.get_mdc_stat("num_strays"),
405 expect_val=0,
406 timeout=60
407 )
408
409 # See the reintegration counter increment
410 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
411 self.assertGreater(curr_reintegrated, last_reintegrated)
412 last_reintegrated = curr_reintegrated
413
414 # Flush the journal
415 self.fs.mds_asok(['flush', 'journal'])
416
417 # See that the backtrace for the file points to the remaining link's path
418 post_reint_bt = self.fs.read_backtrace(ino)
419 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
420
421 # mds should reintegrates stray when unlink finishes
422 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
423 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
424
425 # Stray reintegration should happen as a result of the notify_stray call
426 # on completion of unlink
427 self.wait_until_equal(
428 lambda: self.get_mdc_stat("num_strays"),
429 expect_val=0,
430 timeout=60
431 )
432
433 # See the reintegration counter increment
434 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
435 self.assertGreater(curr_reintegrated, last_reintegrated)
436 last_reintegrated = curr_reintegrated
437
438 # Flush the journal
439 self.fs.mds_asok(['flush', 'journal'])
440
441 # See that the backtrace for the file points to the newest link's path
442 post_reint_bt = self.fs.read_backtrace(ino)
443 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
444
445 # Now really delete it
446 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
447 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
448 self._wait_for_counter("purge_queue", "pq_executed", 1)
449
450 self.assert_purge_idle()
451 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
452
453 # We caused the inode to go stray 3 times
454 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
455 # We purged it at the last
456 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
457
458 def test_mv_hardlink_cleanup(self):
459 """
460 That when doing a rename from A to B, and B has hardlinks,
461 then we make a stray for B which is then reintegrated
462 into one of his hardlinks.
463 """
464 # Create file_a, file_b, and a hardlink to file_b
465 size_mb = 8
466 self.mount_a.write_n_mb("file_a", size_mb)
467 file_a_ino = self.mount_a.path_to_ino("file_a")
468
469 self.mount_a.write_n_mb("file_b", size_mb)
470 file_b_ino = self.mount_a.path_to_ino("file_b")
471
472 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
473 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
474
475 # mv file_a file_b
476 self.mount_a.run_shell(["mv", "file_a", "file_b"])
477
478 # Stray reintegration should happen as a result of the notify_stray call on
479 # completion of rename
480 self.wait_until_equal(
481 lambda: self.get_mdc_stat("num_strays"),
482 expect_val=0,
483 timeout=60
484 )
485
486 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
487 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
488
489 # No data objects should have been deleted, as both files still have linkage.
490 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
491 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
492
493 self.fs.mds_asok(['flush', 'journal'])
494
495 post_reint_bt = self.fs.read_backtrace(file_b_ino)
496 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
497
498 def _setup_two_ranks(self):
499 # Set up two MDSs
500 self.fs.set_max_mds(2)
501
502 # See that we have two active MDSs
503 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
504 reject_fn=lambda v: v > 2 or v < 1)
505
506 active_mds_names = self.fs.get_active_names()
507 rank_0_id = active_mds_names[0]
508 rank_1_id = active_mds_names[1]
509 log.info("Ranks 0 and 1 are {0} and {1}".format(
510 rank_0_id, rank_1_id))
511
512 # Get rid of other MDS daemons so that it's easier to know which
513 # daemons to expect in which ranks after restarts
514 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
515 self.mds_cluster.mds_stop(unneeded_mds)
516 self.mds_cluster.mds_fail(unneeded_mds)
517
518 return rank_0_id, rank_1_id
519
520 def _force_migrate(self, to_id, path, watch_ino):
521 """
522 :param to_id: MDS id to move it to
523 :param path: Filesystem path (string) to move
524 :param watch_ino: Inode number to look for at destination to confirm move
525 :return: None
526 """
527 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
528
529 # Poll the MDS cache dump to watch for the export completing
530 migrated = False
531 migrate_timeout = 60
532 migrate_elapsed = 0
533 while not migrated:
534 data = self.fs.mds_asok(["dump", "cache"], to_id)
535 for inode_data in data:
536 if inode_data['ino'] == watch_ino:
537 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
538 if inode_data['is_auth'] is True:
539 migrated = True
540 break
541
542 if not migrated:
543 if migrate_elapsed > migrate_timeout:
544 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
545 else:
546 migrate_elapsed += 1
547 time.sleep(1)
548
549 def _is_stopped(self, rank):
550 mds_map = self.fs.get_mds_map()
551 return rank not in [i['rank'] for i in mds_map['info'].values()]
552
553 def test_purge_on_shutdown(self):
554 """
555 That when an MDS rank is shut down, its purge queue is
556 drained in the process.
557 """
558 rank_0_id, rank_1_id = self._setup_two_ranks()
559
560 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
561 self.mds_cluster.mds_fail_restart(rank_1_id)
562 self.fs.wait_for_daemons()
563
564 file_count = 5
565
566 self.mount_a.create_n_files("delete_me/file", file_count)
567
568 self._force_migrate(rank_1_id, "delete_me",
569 self.mount_a.path_to_ino("delete_me/file_0"))
570
571 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
572 self.mount_a.umount_wait()
573
574 # See all the strays go into purge queue
575 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
576 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
577 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
578
579 # See nothing get purged from the purge queue (yet)
580 time.sleep(10)
581 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
582
583 # Shut down rank 1
584 self.fs.set_max_mds(1)
585
586 # It shouldn't proceed past stopping because its still not allowed
587 # to purge
588 time.sleep(10)
589 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
590 self.assertFalse(self._is_stopped(1))
591
592 # Permit the daemon to start purging again
593 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
594 'injectargs',
595 "--mds_max_purge_files 100")
596
597 # It should now proceed through shutdown
598 self.fs.wait_for_daemons(timeout=120)
599
600 # ...and in the process purge all that data
601 self.await_data_pool_empty()
602
603 def test_migration_on_shutdown(self):
604 """
605 That when an MDS rank is shut down, any non-purgeable strays
606 get migrated to another rank.
607 """
608
609 rank_0_id, rank_1_id = self._setup_two_ranks()
610
611 # Create a non-purgeable stray in a ~mds1 stray directory
612 # by doing a hard link and deleting the original file
613 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
614 self.mount_a.run_shell(["touch", "dir_1/original"])
615 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
616
617 self._force_migrate(rank_1_id, "dir_1",
618 self.mount_a.path_to_ino("dir_1/original"))
619
620 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
621 self.mount_a.umount_wait()
622 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
623 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
624 self.fs.mds_fail_restart()
625 self.fs.wait_for_daemons()
626
627 active_mds_names = self.fs.get_active_names()
628 rank_0_id = active_mds_names[0]
629 rank_1_id = active_mds_names[1]
630
631 self.mount_a.mount_wait()
632
633 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
634 self.mount_a.umount_wait()
635
636 self._wait_for_counter("mds_cache", "strays_created", 1,
637 mds_id=rank_1_id)
638
639 # Shut down rank 1
640 self.fs.set_max_mds(1)
641 self.fs.wait_for_daemons(timeout=120)
642
643 # See that the stray counter on rank 0 has incremented
644 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
645
646 def assert_backtrace(self, ino, expected_path):
647 """
648 Assert that the backtrace in the data pool for an inode matches
649 an expected /foo/bar path.
650 """
651 expected_elements = expected_path.strip("/").split("/")
652 bt = self.fs.read_backtrace(ino)
653 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
654 self.assertListEqual(expected_elements, actual_elements)
655
656 def get_backtrace_path(self, ino):
657 bt = self.fs.read_backtrace(ino)
658 elements = reversed([dn['dname'] for dn in bt['ancestors']])
659 return "/".join(elements)
660
661 def assert_purge_idle(self):
662 """
663 Assert that the MDS perf counters indicate no strays exist and
664 no ongoing purge activity. Sanity check for when PurgeQueue should
665 be idle.
666 """
667 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
668 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
669 self.assertEqual(mdc_stats["num_strays"], 0)
670 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
671 self.assertEqual(pq_stats["pq_executing"], 0)
672 self.assertEqual(pq_stats["pq_executing_ops"], 0)
673
674 def test_mv_cleanup(self):
675 """
676 That when doing a rename from A to B, and B has no hardlinks,
677 then we make a stray for B and purge him.
678 """
679 # Create file_a and file_b, write some to both
680 size_mb = 8
681 self.mount_a.write_n_mb("file_a", size_mb)
682 file_a_ino = self.mount_a.path_to_ino("file_a")
683 self.mount_a.write_n_mb("file_b", size_mb)
684 file_b_ino = self.mount_a.path_to_ino("file_b")
685
686 self.fs.mds_asok(['flush', 'journal'])
687 self.assert_backtrace(file_a_ino, "file_a")
688 self.assert_backtrace(file_b_ino, "file_b")
689
690 # mv file_a file_b
691 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
692
693 # See that stray counter increments
694 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
695 # Wait for purge counter to increment
696 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
697 self._wait_for_counter("purge_queue", "pq_executed", 1)
698
699 self.assert_purge_idle()
700
701 # file_b should have been purged
702 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
703
704 # Backtrace should have updated from file_a to file_b
705 self.fs.mds_asok(['flush', 'journal'])
706 self.assert_backtrace(file_a_ino, "file_b")
707
708 # file_a's data should still exist
709 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
710
711 def _pool_df(self, pool_name):
712 """
713 Return a dict like
714 {
715 "kb_used": 0,
716 "bytes_used": 0,
717 "max_avail": 19630292406,
718 "objects": 0
719 }
720
721 :param pool_name: Which pool (must exist)
722 """
723 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
724 for p in json.loads(out)['pools']:
725 if p['name'] == pool_name:
726 return p['stats']
727
728 raise RuntimeError("Pool '{0}' not found".format(pool_name))
729
730 def await_data_pool_empty(self):
731 self.wait_until_true(
732 lambda: self._pool_df(
733 self.fs.get_data_pool_name()
734 )['objects'] == 0,
735 timeout=60)
736
737 def test_snapshot_remove(self):
738 """
739 That removal of a snapshot that references a now-unlinked file results
740 in purging on the stray for the file.
741 """
742 # Enable snapshots
743 self.fs.set_allow_new_snaps(True)
744
745 # Create a dir with a file in it
746 size_mb = 8
747 self.mount_a.run_shell(["mkdir", "snapdir"])
748 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
749 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
750 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
751
752 # Snapshot the dir
753 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
754
755 # Cause the head revision to deviate from the snapshot
756 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
757
758 # Flush the journal so that backtraces, dirfrag objects will actually be written
759 self.fs.mds_asok(["flush", "journal"])
760
761 # Unlink the file
762 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
763 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
764
765 # Unmount the client because when I come back to check the data is still
766 # in the file I don't want to just see what's in the page cache.
767 self.mount_a.umount_wait()
768
769 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
770
771 # FIXME: at this stage we see a purge and the stray count drops to
772 # zero, but there's actually still a stray, so at the very
773 # least the StrayManager stats code is slightly off
774
775 self.mount_a.mount_wait()
776
777 # See that the data from the snapshotted revision of the file is still present
778 # and correct
779 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
780
781 # Remove the snapshot
782 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
783
784 # Purging file_a doesn't happen until after we've flushed the journal, because
785 # it is referenced by the snapshotted subdir, and the snapshot isn't really
786 # gone until the journal references to it are gone
787 self.fs.mds_asok(["flush", "journal"])
788
789 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
790 # See also: http://tracker.ceph.com/issues/20072
791 self.wait_until_true(
792 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
793 timeout=60
794 )
795
796 # See that a purge happens now
797 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
798 self._wait_for_counter("purge_queue", "pq_executed", 2)
799
800 self.await_data_pool_empty()
801
802 def test_fancy_layout(self):
803 """
804 purge stray file with fancy layout
805 """
806
807 file_name = "fancy_layout_file"
808 self.mount_a.run_shell(["touch", file_name])
809
810 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
811 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
812
813 # 35MB requires 7 objects
814 size_mb = 35
815 self.mount_a.write_n_mb(file_name, size_mb)
816
817 self.mount_a.run_shell(["rm", "-f", file_name])
818 self.fs.mds_asok(["flush", "journal"])
819
820 # can't use self.fs.data_objects_absent here, it does not support fancy layout
821 self.await_data_pool_empty()
822
823 def test_dirfrag_limit(self):
824 """
825 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
826
827 That fragmentation (forced) will allow more entries to be created.
828
829 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
830 """
831
832 LOW_LIMIT = 50
833 for mds in self.fs.get_daemon_names():
834 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
835
836 try:
837 self.mount_a.run_python(dedent("""
838 import os
839 path = os.path.join("{path}", "subdir")
840 os.mkdir(path)
841 for n in range(0, {file_count}):
842 with open(os.path.join(path, "%s" % n), 'w') as f:
843 f.write(str(n))
844 """.format(
845 path=self.mount_a.mountpoint,
846 file_count=LOW_LIMIT+1
847 )))
848 except CommandFailedError:
849 pass # ENOSPAC
850 else:
851 raise RuntimeError("fragment size exceeded")
852
853 # Now test that we can go beyond the limit if we fragment the directory
854
855 self.mount_a.run_python(dedent("""
856 import os
857 path = os.path.join("{path}", "subdir2")
858 os.mkdir(path)
859 for n in range(0, {file_count}):
860 with open(os.path.join(path, "%s" % n), 'w') as f:
861 f.write(str(n))
862 dfd = os.open(path, os.O_DIRECTORY)
863 os.fsync(dfd)
864 """.format(
865 path=self.mount_a.mountpoint,
866 file_count=LOW_LIMIT
867 )))
868
869 # Ensure that subdir2 is fragmented
870 mds_id = self.fs.get_active_names()[0]
871 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
872
873 # remount+flush (release client caps)
874 self.mount_a.umount_wait()
875 self.fs.mds_asok(["flush", "journal"], mds_id)
876 self.mount_a.mount_wait()
877
878 # Create 50% more files than the current fragment limit
879 self.mount_a.run_python(dedent("""
880 import os
881 path = os.path.join("{path}", "subdir2")
882 for n in range({file_count}, ({file_count}*3)//2):
883 with open(os.path.join(path, "%s" % n), 'w') as f:
884 f.write(str(n))
885 """.format(
886 path=self.mount_a.mountpoint,
887 file_count=LOW_LIMIT
888 )))
889
890 # Now test the stray directory size is limited and recovers
891 strays_before = self.get_mdc_stat("strays_created")
892 try:
893 self.mount_a.run_python(dedent("""
894 import os
895 path = os.path.join("{path}", "subdir3")
896 os.mkdir(path)
897 for n in range({file_count}):
898 fpath = os.path.join(path, "%s" % n)
899 with open(fpath, 'w') as f:
900 f.write(str(n))
901 os.unlink(fpath)
902 """.format(
903 path=self.mount_a.mountpoint,
904 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
905 )))
906 except CommandFailedError:
907 pass # ENOSPAC
908 else:
909 raise RuntimeError("fragment size exceeded")
910
911 strays_after = self.get_mdc_stat("strays_created")
912 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
913
914 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
915 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
916
917 self.mount_a.run_python(dedent("""
918 import os
919 path = os.path.join("{path}", "subdir4")
920 os.mkdir(path)
921 for n in range({file_count}):
922 fpath = os.path.join(path, "%s" % n)
923 with open(fpath, 'w') as f:
924 f.write(str(n))
925 os.unlink(fpath)
926 """.format(
927 path=self.mount_a.mountpoint,
928 file_count=LOW_LIMIT
929 )))
930
931 def test_purge_queue_upgrade(self):
932 """
933 That when starting on a system with no purge queue in the metadata
934 pool, we silently create one.
935 :return:
936 """
937
938 self.mds_cluster.mds_stop()
939 self.mds_cluster.mds_fail()
940 self.fs.rados(["rm", "500.00000000"])
941 self.mds_cluster.mds_restart()
942 self.fs.wait_for_daemons()
943
944 def test_replicated_delete_speed(self):
945 """
946 That deletions of replicated metadata are not pathologically slow
947 """
948 rank_0_id, rank_1_id = self._setup_two_ranks()
949
950 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
951 self.mds_cluster.mds_fail_restart(rank_1_id)
952 self.fs.wait_for_daemons()
953
954 file_count = 10
955
956 self.mount_a.create_n_files("delete_me/file", file_count)
957
958 self._force_migrate(rank_1_id, "delete_me",
959 self.mount_a.path_to_ino("delete_me/file_0"))
960
961 begin = datetime.datetime.now()
962 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
963 end = datetime.datetime.now()
964
965 # What we're really checking here is that we are completing client
966 # operations immediately rather than delaying until the next tick.
967 tick_period = float(self.fs.get_config("mds_tick_interval",
968 service_type="mds"))
969
970 duration = (end - begin).total_seconds()
971 self.assertLess(duration, (file_count * tick_period) * 0.25)
972