]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/test_strays.py
update sources to 12.2.7
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
CommitLineData
7c673cae
FG
1import json
2import time
3import logging
4from textwrap import dedent
5import datetime
6import gevent
224ce89b
WB
7import datetime
8
7c673cae
FG
9from teuthology.orchestra.run import CommandFailedError, Raw
10from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12log = logging.getLogger(__name__)
13
14
15class TestStrays(CephFSTestCase):
16 MDSS_REQUIRED = 2
17
18 OPS_THROTTLE = 1
19 FILES_THROTTLE = 2
20
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
23
24 @for_teuthology
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
27
28 @for_teuthology
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
31
32 def test_dir_deletion(self):
33 """
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
38 """
39 file_count = 1000
40 create_script = dedent("""
41 import os
42
43 mount_path = "{mount_path}"
44 subdir = "delete_me"
45 size = {size}
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
51 f.write(size * 'x')
52 f.close()
53 """.format(
54 mount_path=self.mount_a.mountpoint,
55 size=1024,
56 file_count=file_count
57 ))
58
59 self.mount_a.run_python(create_script)
60
61 # That the dirfrag object is created
62 self.fs.mds_asok(["flush", "journal"])
63 dir_ino = self.mount_a.path_to_ino("delete_me")
64 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
65
66 # Remove everything
67 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
68 self.fs.mds_asok(["flush", "journal"])
69
70 # That all the removed files get created as strays
71 strays = self.get_mdc_stat("strays_created")
72 self.assertEqual(strays, file_count + 1)
73
74 # That the strays all get enqueued for purge
75 self.wait_until_equal(
76 lambda: self.get_mdc_stat("strays_enqueued"),
77 strays,
78 timeout=600
79
80 )
81
82 # That all the purge operations execute
83 self.wait_until_equal(
84 lambda: self.get_stat("purge_queue", "pq_executed"),
85 strays,
86 timeout=600
87 )
88
89 # That finally, the directory metadata object is gone
90 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
91
92 # That finally, the data objects are all gone
93 self.await_data_pool_empty()
94
95 def _test_throttling(self, throttle_type):
96 self.data_log = []
97 try:
98 return self._do_test_throttling(throttle_type)
99 except:
100 for l in self.data_log:
101 log.info(",".join([l_.__str__() for l_ in l]))
102 raise
103
104 def _do_test_throttling(self, throttle_type):
105 """
106 That the mds_max_purge_ops setting is respected
107 """
108
109 def set_throttles(files, ops):
110 """
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
113 """
114 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
115 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
116
117 pgs = self.fs.mon_manager.get_pool_property(
118 self.fs.get_data_pool_name(),
119 "pg_num"
120 )
121 ops_per_pg = float(ops) / pgs
122 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
123
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type == self.OPS_THROTTLE:
130 set_throttles(files=100000000, ops=16)
131 size_unit = 1024 * 1024 # big files, generate lots of ops
132 file_multiplier = 100
133 elif throttle_type == self.FILES_THROTTLE:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
136 # pretty low.
137 set_throttles(ops=100000000, files=6)
138 size_unit = 1024 # small, numerous files
139 file_multiplier = 200
140 else:
141 raise NotImplemented(throttle_type)
142
143 # Pick up config changes
144 self.fs.mds_fail_restart()
145 self.fs.wait_for_daemons()
146
147 create_script = dedent("""
148 import os
149
150 mount_path = "{mount_path}"
151 subdir = "delete_me"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
159 f.write(size * 'x')
160 f.close()
161 """.format(
162 mount_path=self.mount_a.mountpoint,
163 size_unit=size_unit,
164 file_multiplier=file_multiplier,
165 size_range=self.throttle_workload_size_range
166 ))
167
168 self.mount_a.run_python(create_script)
169
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
172 def background():
173 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
174 self.fs.mds_asok(["flush", "journal"])
175
176 background_thread = gevent.spawn(background)
177
178 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
179 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
181
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
184 # condition).
185 purge_timeout = 600
186 elapsed = 0
187 files_high_water = 0
188 ops_high_water = 0
189
190 while True:
191 stats = self.fs.mds_asok(['perf', 'dump'])
192 mdc_stats = stats['mds_cache']
193 pq_stats = stats['purge_queue']
194 if elapsed >= purge_timeout:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
196
197 num_strays = mdc_stats['num_strays']
198 num_strays_purging = pq_stats['pq_executing']
199 num_purge_ops = pq_stats['pq_executing_ops']
200
201 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
202
203 files_high_water = max(files_high_water, num_strays_purging)
204 ops_high_water = max(ops_high_water, num_purge_ops)
205
206 total_strays_created = mdc_stats['strays_created']
207 total_strays_purged = pq_stats['pq_executed']
208
209 if total_strays_purged == total_inodes:
210 log.info("Complete purge in {0} seconds".format(elapsed))
211 break
212 elif total_strays_purged > total_inodes:
213 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
214 else:
215 if throttle_type == self.OPS_THROTTLE:
216 # 11 is filer_max_purge_ops plus one for the backtrace:
217 # limit is allowed to be overshot by this much.
218 if num_purge_ops > mds_max_purge_ops + 11:
219 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
220 num_purge_ops, mds_max_purge_ops
221 ))
222 elif throttle_type == self.FILES_THROTTLE:
223 if num_strays_purging > mds_max_purge_files:
224 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
225 num_strays_purging, mds_max_purge_files
226 ))
227 else:
228 raise NotImplemented(throttle_type)
229
230 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
231 num_strays_purging, num_strays,
232 total_strays_purged, total_strays_created
233 ))
234 time.sleep(1)
235 elapsed += 1
236
237 background_thread.join()
238
239 # Check that we got up to a respectable rate during the purge. This is totally
240 # racy, but should be safeish unless the cluster is pathologically slow, or
241 # insanely fast such that the deletions all pass before we have polled the
242 # statistics.
243 if throttle_type == self.OPS_THROTTLE:
244 if ops_high_water < mds_max_purge_ops / 2:
245 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
246 ops_high_water, mds_max_purge_ops
247 ))
248 elif throttle_type == self.FILES_THROTTLE:
249 if files_high_water < mds_max_purge_files / 2:
250 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
251 ops_high_water, mds_max_purge_files
252 ))
253
254 # Sanity check all MDC stray stats
255 stats = self.fs.mds_asok(['perf', 'dump'])
256 mdc_stats = stats['mds_cache']
257 pq_stats = stats['purge_queue']
258 self.assertEqual(mdc_stats['num_strays'], 0)
259 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
260 self.assertEqual(pq_stats['pq_executing'], 0)
261 self.assertEqual(pq_stats['pq_executing_ops'], 0)
262 self.assertEqual(mdc_stats['strays_created'], total_inodes)
263 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
264 self.assertEqual(pq_stats['pq_executed'], total_inodes)
265
266 def get_mdc_stat(self, name, mds_id=None):
267 return self.get_stat("mds_cache", name, mds_id)
268
269 def get_stat(self, subsys, name, mds_id=None):
270 return self.fs.mds_asok(['perf', 'dump', subsys, name],
271 mds_id=mds_id)[subsys][name]
272
273 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
274 mds_id=None):
275 self.wait_until_equal(
276 lambda: self.get_stat(subsys, counter, mds_id),
277 expect_val=expect_val, timeout=timeout,
278 reject_fn=lambda x: x > expect_val
279 )
280
281 def test_open_inode(self):
282 """
283 That the case of a dentry unlinked while a client holds an
284 inode open is handled correctly.
285
286 The inode should be moved into a stray dentry, while the original
287 dentry and directory should be purged.
288
289 The inode's data should be purged when the client eventually closes
290 it.
291 """
292 mount_a_client_id = self.mount_a.get_global_id()
293
294 # Write some bytes to a file
295 size_mb = 8
296
297 # Hold the file open
298 p = self.mount_a.open_background("open_file")
299 self.mount_a.write_n_mb("open_file", size_mb)
300 open_file_ino = self.mount_a.path_to_ino("open_file")
301
302 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
303
304 # Unlink the dentry
305 self.mount_a.run_shell(["rm", "-f", "open_file"])
306
307 # Wait to see the stray count increment
308 self.wait_until_equal(
309 lambda: self.get_mdc_stat("num_strays"),
310 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
311
312 # See that while the stray count has incremented, none have passed
313 # on to the purge queue
314 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
315 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
316
317 # See that the client still holds 2 caps
318 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
319
320 # See that the data objects remain in the data pool
321 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
322
323 # Now close the file
324 self.mount_a.kill_background(p)
325
326 # Wait to see the client cap count decrement
327 self.wait_until_equal(
328 lambda: self.get_session(mount_a_client_id)['num_caps'],
329 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
330 )
331 # Wait to see the purge counter increment, stray count go to zero
332 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
333 self.wait_until_equal(
334 lambda: self.get_mdc_stat("num_strays"),
335 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
336 )
337 self._wait_for_counter("purge_queue", "pq_executed", 1)
338
339 # See that the data objects no longer exist
340 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
341
342 self.await_data_pool_empty()
343
344 def test_hardlink_reintegration(self):
345 """
346 That removal of primary dentry of hardlinked inode results
347 in reintegration of inode into the previously-remote dentry,
348 rather than lingering as a stray indefinitely.
349 """
350 # Write some bytes to file_a
351 size_mb = 8
31f18b77
FG
352 self.mount_a.run_shell(["mkdir", "dir_1"])
353 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
354 ino = self.mount_a.path_to_ino("dir_1/file_a")
7c673cae
FG
355
356 # Create a hardlink named file_b
31f18b77
FG
357 self.mount_a.run_shell(["mkdir", "dir_2"])
358 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
359 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
7c673cae
FG
360
361 # Flush journal
362 self.fs.mds_asok(['flush', 'journal'])
363
364 # See that backtrace for the file points to the file_a path
365 pre_unlink_bt = self.fs.read_backtrace(ino)
366 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
367
31f18b77
FG
368 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
369 self.mount_a.umount_wait()
370 self.fs.mds_asok(['flush', 'journal'])
371 self.fs.mds_fail_restart()
372 self.fs.wait_for_daemons()
373 self.mount_a.mount()
374
7c673cae 375 # Unlink file_a
31f18b77 376 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
7c673cae
FG
377
378 # See that a stray was created
379 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
380 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
381
382 # Wait, see that data objects are still present (i.e. that the
383 # stray did not advance to purging given time)
384 time.sleep(30)
385 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
386 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
387
388 # See that before reintegration, the inode's backtrace points to a stray dir
389 self.fs.mds_asok(['flush', 'journal'])
390 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
391
31f18b77
FG
392 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
393
7c673cae
FG
394 # Do a metadata operation on the remaining link (mv is heavy handed, but
395 # others like touch may be satisfied from caps without poking MDS)
31f18b77
FG
396 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
397
398 # Stray reintegration should happen as a result of the eval_remote call
399 # on responding to a client request.
400 self.wait_until_equal(
401 lambda: self.get_mdc_stat("num_strays"),
402 expect_val=0,
403 timeout=60
404 )
7c673cae
FG
405
406 # See the reintegration counter increment
31f18b77
FG
407 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
408 self.assertGreater(curr_reintegrated, last_reintegrated)
409 last_reintegrated = curr_reintegrated
7c673cae
FG
410
411 # Flush the journal
412 self.fs.mds_asok(['flush', 'journal'])
413
414 # See that the backtrace for the file points to the remaining link's path
415 post_reint_bt = self.fs.read_backtrace(ino)
416 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
417
31f18b77
FG
418 # mds should reintegrates stray when unlink finishes
419 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
420 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
421
422 # Stray reintegration should happen as a result of the notify_stray call
423 # on completion of unlink
424 self.wait_until_equal(
425 lambda: self.get_mdc_stat("num_strays"),
426 expect_val=0,
427 timeout=60
428 )
429
430 # See the reintegration counter increment
431 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
432 self.assertGreater(curr_reintegrated, last_reintegrated)
433 last_reintegrated = curr_reintegrated
434
435 # Flush the journal
436 self.fs.mds_asok(['flush', 'journal'])
437
438 # See that the backtrace for the file points to the newest link's path
439 post_reint_bt = self.fs.read_backtrace(ino)
440 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
7c673cae
FG
441
442 # Now really delete it
31f18b77 443 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
7c673cae
FG
444 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
445 self._wait_for_counter("purge_queue", "pq_executed", 1)
446
447 self.assert_purge_idle()
448 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
449
31f18b77
FG
450 # We caused the inode to go stray 3 times
451 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
452 # We purged it at the last
7c673cae
FG
453 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
454
455 def test_mv_hardlink_cleanup(self):
456 """
457 That when doing a rename from A to B, and B has hardlinks,
458 then we make a stray for B which is then reintegrated
459 into one of his hardlinks.
460 """
461 # Create file_a, file_b, and a hardlink to file_b
462 size_mb = 8
463 self.mount_a.write_n_mb("file_a", size_mb)
464 file_a_ino = self.mount_a.path_to_ino("file_a")
465
466 self.mount_a.write_n_mb("file_b", size_mb)
467 file_b_ino = self.mount_a.path_to_ino("file_b")
468
469 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
470 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
471
472 # mv file_a file_b
473 self.mount_a.run_shell(["mv", "file_a", "file_b"])
474
31f18b77
FG
475 # Stray reintegration should happen as a result of the notify_stray call on
476 # completion of rename
477 self.wait_until_equal(
478 lambda: self.get_mdc_stat("num_strays"),
479 expect_val=0,
480 timeout=60
481 )
7c673cae 482
7c673cae 483 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
31f18b77
FG
484 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
485
486 # No data objects should have been deleted, as both files still have linkage.
7c673cae
FG
487 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
488 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
489
7c673cae
FG
490 self.fs.mds_asok(['flush', 'journal'])
491
492 post_reint_bt = self.fs.read_backtrace(file_b_ino)
31f18b77 493 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
7c673cae
FG
494
495 def _setup_two_ranks(self):
496 # Set up two MDSs
7c673cae
FG
497 self.fs.set_max_mds(2)
498
499 # See that we have two active MDSs
500 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
501 reject_fn=lambda v: v > 2 or v < 1)
502
503 active_mds_names = self.fs.get_active_names()
504 rank_0_id = active_mds_names[0]
505 rank_1_id = active_mds_names[1]
506 log.info("Ranks 0 and 1 are {0} and {1}".format(
507 rank_0_id, rank_1_id))
508
509 # Get rid of other MDS daemons so that it's easier to know which
510 # daemons to expect in which ranks after restarts
511 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
512 self.mds_cluster.mds_stop(unneeded_mds)
513 self.mds_cluster.mds_fail(unneeded_mds)
514
515 return rank_0_id, rank_1_id
516
31f18b77 517 def _force_migrate(self, to_id, path, watch_ino):
7c673cae 518 """
7c673cae
FG
519 :param to_id: MDS id to move it to
520 :param path: Filesystem path (string) to move
521 :param watch_ino: Inode number to look for at destination to confirm move
522 :return: None
523 """
31f18b77 524 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
7c673cae
FG
525
526 # Poll the MDS cache dump to watch for the export completing
527 migrated = False
528 migrate_timeout = 60
529 migrate_elapsed = 0
530 while not migrated:
531 data = self.fs.mds_asok(["dump", "cache"], to_id)
532 for inode_data in data:
533 if inode_data['ino'] == watch_ino:
534 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
535 if inode_data['is_auth'] is True:
536 migrated = True
537 break
538
539 if not migrated:
540 if migrate_elapsed > migrate_timeout:
541 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
542 else:
543 migrate_elapsed += 1
544 time.sleep(1)
545
546 def _is_stopped(self, rank):
547 mds_map = self.fs.get_mds_map()
548 return rank not in [i['rank'] for i in mds_map['info'].values()]
549
550 def test_purge_on_shutdown(self):
551 """
552 That when an MDS rank is shut down, its purge queue is
553 drained in the process.
554 """
555 rank_0_id, rank_1_id = self._setup_two_ranks()
556
557 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
558 self.mds_cluster.mds_fail_restart(rank_1_id)
559 self.fs.wait_for_daemons()
560
561 file_count = 5
562
563 self.mount_a.create_n_files("delete_me/file", file_count)
564
31f18b77 565 self._force_migrate(rank_1_id, "delete_me",
7c673cae
FG
566 self.mount_a.path_to_ino("delete_me/file_0"))
567
568 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
569 self.mount_a.umount_wait()
570
571 # See all the strays go into purge queue
572 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
573 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
574 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
575
576 # See nothing get purged from the purge queue (yet)
577 time.sleep(10)
578 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
579
580 # Shut down rank 1
581 self.fs.set_max_mds(1)
582 self.fs.deactivate(1)
583
584 # It shouldn't proceed past stopping because its still not allowed
585 # to purge
586 time.sleep(10)
587 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
588 self.assertFalse(self._is_stopped(1))
589
590 # Permit the daemon to start purging again
591 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
592 'injectargs',
593 "--mds_max_purge_files 100")
594
595 # It should now proceed through shutdown
596 self.wait_until_true(
597 lambda: self._is_stopped(1),
598 timeout=60
599 )
600
601 # ...and in the process purge all that data
602 self.await_data_pool_empty()
603
604 def test_migration_on_shutdown(self):
605 """
606 That when an MDS rank is shut down, any non-purgeable strays
607 get migrated to another rank.
608 """
609
610 rank_0_id, rank_1_id = self._setup_two_ranks()
611
612 # Create a non-purgeable stray in a ~mds1 stray directory
613 # by doing a hard link and deleting the original file
31f18b77
FG
614 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
615 self.mount_a.run_shell(["touch", "dir_1/original"])
616 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
617
618 self._force_migrate(rank_1_id, "dir_1",
619 self.mount_a.path_to_ino("dir_1/original"))
620
621 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
622 self.mount_a.umount_wait()
623 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
624 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
625 self.fs.mds_fail_restart()
626 self.fs.wait_for_daemons()
7c673cae 627
31f18b77
FG
628 active_mds_names = self.fs.get_active_names()
629 rank_0_id = active_mds_names[0]
630 rank_1_id = active_mds_names[1]
631
632 self.mount_a.mount()
7c673cae 633
31f18b77 634 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
7c673cae
FG
635 self.mount_a.umount_wait()
636
637 self._wait_for_counter("mds_cache", "strays_created", 1,
638 mds_id=rank_1_id)
639
640 # Shut down rank 1
641 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
642 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
643
644 # Wait til we get to a single active MDS mdsmap state
645 self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
646
647 # See that the stray counter on rank 0 has incremented
648 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
649
650 def assert_backtrace(self, ino, expected_path):
651 """
652 Assert that the backtrace in the data pool for an inode matches
653 an expected /foo/bar path.
654 """
655 expected_elements = expected_path.strip("/").split("/")
656 bt = self.fs.read_backtrace(ino)
657 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
658 self.assertListEqual(expected_elements, actual_elements)
659
660 def get_backtrace_path(self, ino):
661 bt = self.fs.read_backtrace(ino)
662 elements = reversed([dn['dname'] for dn in bt['ancestors']])
663 return "/".join(elements)
664
665 def assert_purge_idle(self):
666 """
667 Assert that the MDS perf counters indicate no strays exist and
668 no ongoing purge activity. Sanity check for when PurgeQueue should
669 be idle.
670 """
671 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
672 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
673 self.assertEqual(mdc_stats["num_strays"], 0)
674 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
675 self.assertEqual(pq_stats["pq_executing"], 0)
676 self.assertEqual(pq_stats["pq_executing_ops"], 0)
677
678 def test_mv_cleanup(self):
679 """
680 That when doing a rename from A to B, and B has no hardlinks,
681 then we make a stray for B and purge him.
682 """
683 # Create file_a and file_b, write some to both
684 size_mb = 8
685 self.mount_a.write_n_mb("file_a", size_mb)
686 file_a_ino = self.mount_a.path_to_ino("file_a")
687 self.mount_a.write_n_mb("file_b", size_mb)
688 file_b_ino = self.mount_a.path_to_ino("file_b")
689
690 self.fs.mds_asok(['flush', 'journal'])
691 self.assert_backtrace(file_a_ino, "file_a")
692 self.assert_backtrace(file_b_ino, "file_b")
693
694 # mv file_a file_b
695 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
696
697 # See that stray counter increments
698 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
699 # Wait for purge counter to increment
700 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
701 self._wait_for_counter("purge_queue", "pq_executed", 1)
702
703 self.assert_purge_idle()
704
705 # file_b should have been purged
706 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
707
708 # Backtrace should have updated from file_a to file_b
709 self.fs.mds_asok(['flush', 'journal'])
710 self.assert_backtrace(file_a_ino, "file_b")
711
712 # file_a's data should still exist
713 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
714
715 def _pool_df(self, pool_name):
716 """
717 Return a dict like
718 {
719 "kb_used": 0,
720 "bytes_used": 0,
721 "max_avail": 19630292406,
722 "objects": 0
723 }
724
725 :param pool_name: Which pool (must exist)
726 """
727 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
728 for p in json.loads(out)['pools']:
729 if p['name'] == pool_name:
730 return p['stats']
731
732 raise RuntimeError("Pool '{0}' not found".format(pool_name))
733
734 def await_data_pool_empty(self):
735 self.wait_until_true(
736 lambda: self._pool_df(
737 self.fs.get_data_pool_name()
738 )['objects'] == 0,
739 timeout=60)
740
741 def test_snapshot_remove(self):
742 """
743 That removal of a snapshot that references a now-unlinked file results
744 in purging on the stray for the file.
745 """
746 # Enable snapshots
747 self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
748 "--yes-i-really-mean-it")
749
750 # Create a dir with a file in it
751 size_mb = 8
752 self.mount_a.run_shell(["mkdir", "snapdir"])
753 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
754 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
755 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
756
757 # Snapshot the dir
758 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
759
760 # Cause the head revision to deviate from the snapshot
761 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
762
763 # Flush the journal so that backtraces, dirfrag objects will actually be written
764 self.fs.mds_asok(["flush", "journal"])
765
766 # Unlink the file
767 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
768 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
769
770 # Unmount the client because when I come back to check the data is still
771 # in the file I don't want to just see what's in the page cache.
772 self.mount_a.umount_wait()
773
774 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
775
776 # FIXME: at this stage we see a purge and the stray count drops to
777 # zero, but there's actually still a stray, so at the very
778 # least the StrayManager stats code is slightly off
779
780 self.mount_a.mount()
781
782 # See that the data from the snapshotted revision of the file is still present
783 # and correct
784 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
785
786 # Remove the snapshot
787 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
7c673cae
FG
788
789 # Purging file_a doesn't happen until after we've flushed the journal, because
790 # it is referenced by the snapshotted subdir, and the snapshot isn't really
791 # gone until the journal references to it are gone
792 self.fs.mds_asok(["flush", "journal"])
793
c07f9fc5
FG
794 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
795 # See also: http://tracker.ceph.com/issues/20072
796 self.wait_until_true(
797 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
798 timeout=60
799 )
800
7c673cae
FG
801 # See that a purge happens now
802 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
803 self._wait_for_counter("purge_queue", "pq_executed", 2)
804
7c673cae
FG
805 self.await_data_pool_empty()
806
807 def test_fancy_layout(self):
808 """
809 purge stray file with fancy layout
810 """
811
812 file_name = "fancy_layout_file"
813 self.mount_a.run_shell(["touch", file_name])
814
815 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
816 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
817
818 # 35MB requires 7 objects
819 size_mb = 35
820 self.mount_a.write_n_mb(file_name, size_mb)
821
822 self.mount_a.run_shell(["rm", "-f", file_name])
823 self.fs.mds_asok(["flush", "journal"])
824
825 # can't use self.fs.data_objects_absent here, it does not support fancy layout
826 self.await_data_pool_empty()
827
828 def test_dirfrag_limit(self):
829 """
830 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
831
832 That fragmentation (forced) will allow more entries to be created.
833
834 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
835 """
836
837 self.fs.set_allow_dirfrags(True)
838
839 LOW_LIMIT = 50
840 for mds in self.fs.get_daemon_names():
841 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
842
843 try:
844 self.mount_a.run_python(dedent("""
845 import os
846 path = os.path.join("{path}", "subdir")
847 os.mkdir(path)
848 for n in range(0, {file_count}):
849 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
850 """.format(
851 path=self.mount_a.mountpoint,
852 file_count=LOW_LIMIT+1
853 )))
854 except CommandFailedError:
855 pass # ENOSPAC
856 else:
857 raise RuntimeError("fragment size exceeded")
858
859 # Now test that we can go beyond the limit if we fragment the directory
860
861 self.mount_a.run_python(dedent("""
862 import os
863 path = os.path.join("{path}", "subdir2")
864 os.mkdir(path)
865 for n in range(0, {file_count}):
866 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
867 dfd = os.open(path, os.O_DIRECTORY)
868 os.fsync(dfd)
869 """.format(
870 path=self.mount_a.mountpoint,
871 file_count=LOW_LIMIT
872 )))
873
874 # Ensure that subdir2 is fragmented
875 mds_id = self.fs.get_active_names()[0]
876 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
877
878 # remount+flush (release client caps)
879 self.mount_a.umount_wait()
880 self.fs.mds_asok(["flush", "journal"], mds_id)
881 self.mount_a.mount()
882 self.mount_a.wait_until_mounted()
883
884 # Create 50% more files than the current fragment limit
885 self.mount_a.run_python(dedent("""
886 import os
887 path = os.path.join("{path}", "subdir2")
888 for n in range({file_count}, ({file_count}*3)//2):
889 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
890 """.format(
891 path=self.mount_a.mountpoint,
892 file_count=LOW_LIMIT
893 )))
894
895 # Now test the stray directory size is limited and recovers
896 strays_before = self.get_mdc_stat("strays_created")
897 try:
898 self.mount_a.run_python(dedent("""
899 import os
900 path = os.path.join("{path}", "subdir3")
901 os.mkdir(path)
902 for n in range({file_count}):
903 fpath = os.path.join(path, "%s" % n)
904 f = open(fpath, 'w')
905 f.write("%s" % n)
906 f.close()
907 os.unlink(fpath)
908 """.format(
909 path=self.mount_a.mountpoint,
910 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
911 )))
912 except CommandFailedError:
913 pass # ENOSPAC
914 else:
915 raise RuntimeError("fragment size exceeded")
916
917 strays_after = self.get_mdc_stat("strays_created")
918 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
919
920 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
921 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
922
923 self.mount_a.run_python(dedent("""
924 import os
925 path = os.path.join("{path}", "subdir4")
926 os.mkdir(path)
927 for n in range({file_count}):
928 fpath = os.path.join(path, "%s" % n)
929 f = open(fpath, 'w')
930 f.write("%s" % n)
931 f.close()
932 os.unlink(fpath)
933 """.format(
934 path=self.mount_a.mountpoint,
935 file_count=LOW_LIMIT
936 )))
937
938 def test_purge_queue_upgrade(self):
939 """
940 That when starting on a system with no purge queue in the metadata
941 pool, we silently create one.
942 :return:
943 """
944
945 self.mds_cluster.mds_stop()
946 self.mds_cluster.mds_fail()
947 self.fs.rados(["rm", "500.00000000"])
948 self.mds_cluster.mds_restart()
949 self.fs.wait_for_daemons()
950
224ce89b
WB
951 def test_replicated_delete_speed(self):
952 """
953 That deletions of replicated metadata are not pathologically slow
954 """
955 rank_0_id, rank_1_id = self._setup_two_ranks()
956
957 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
958 self.mds_cluster.mds_fail_restart(rank_1_id)
959 self.fs.wait_for_daemons()
960
961 file_count = 10
962
963 self.mount_a.create_n_files("delete_me/file", file_count)
964
965 self._force_migrate(rank_1_id, "delete_me",
966 self.mount_a.path_to_ino("delete_me/file_0"))
967
968 begin = datetime.datetime.now()
969 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
970 end = datetime.datetime.now()
971
972 # What we're really checking here is that we are completing client
973 # operations immediately rather than delaying until the next tick.
974 tick_period = float(self.fs.get_config("mds_tick_interval",
975 service_type="mds"))
976
977 duration = (end - begin).total_seconds()
978 self.assertLess(duration, (file_count * tick_period) * 0.25)
979