]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
3335d89dd90023ceff5d77b878a9e28f2a2f4922
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7 from teuthology.orchestra.run import CommandFailedError, Raw
8 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
9
10 log = logging.getLogger(__name__)
11
12
13 class TestStrays(CephFSTestCase):
14 MDSS_REQUIRED = 2
15
16 OPS_THROTTLE = 1
17 FILES_THROTTLE = 2
18
19 # Range of different file sizes used in throttle test's workload
20 throttle_workload_size_range = 16
21
22 @for_teuthology
23 def test_ops_throttle(self):
24 self._test_throttling(self.OPS_THROTTLE)
25
26 @for_teuthology
27 def test_files_throttle(self):
28 self._test_throttling(self.FILES_THROTTLE)
29
30 def test_dir_deletion(self):
31 """
32 That when deleting a bunch of dentries and the containing
33 directory, everything gets purged.
34 Catches cases where the client might e.g. fail to trim
35 the unlinked dir from its cache.
36 """
37 file_count = 1000
38 create_script = dedent("""
39 import os
40
41 mount_path = "{mount_path}"
42 subdir = "delete_me"
43 size = {size}
44 file_count = {file_count}
45 os.mkdir(os.path.join(mount_path, subdir))
46 for i in xrange(0, file_count):
47 filename = "{{0}}_{{1}}.bin".format(i, size)
48 f = open(os.path.join(mount_path, subdir, filename), 'w')
49 f.write(size * 'x')
50 f.close()
51 """.format(
52 mount_path=self.mount_a.mountpoint,
53 size=1024,
54 file_count=file_count
55 ))
56
57 self.mount_a.run_python(create_script)
58
59 # That the dirfrag object is created
60 self.fs.mds_asok(["flush", "journal"])
61 dir_ino = self.mount_a.path_to_ino("delete_me")
62 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
63
64 # Remove everything
65 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
66 self.fs.mds_asok(["flush", "journal"])
67
68 # That all the removed files get created as strays
69 strays = self.get_mdc_stat("strays_created")
70 self.assertEqual(strays, file_count + 1)
71
72 # That the strays all get enqueued for purge
73 self.wait_until_equal(
74 lambda: self.get_mdc_stat("strays_enqueued"),
75 strays,
76 timeout=600
77
78 )
79
80 # That all the purge operations execute
81 self.wait_until_equal(
82 lambda: self.get_stat("purge_queue", "pq_executed"),
83 strays,
84 timeout=600
85 )
86
87 # That finally, the directory metadata object is gone
88 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
89
90 # That finally, the data objects are all gone
91 self.await_data_pool_empty()
92
93 def _test_throttling(self, throttle_type):
94 self.data_log = []
95 try:
96 return self._do_test_throttling(throttle_type)
97 except:
98 for l in self.data_log:
99 log.info(",".join([l_.__str__() for l_ in l]))
100 raise
101
102 def _do_test_throttling(self, throttle_type):
103 """
104 That the mds_max_purge_ops setting is respected
105 """
106
107 def set_throttles(files, ops):
108 """
109 Helper for updating ops/files limits, and calculating effective
110 ops_per_pg setting to give the same ops limit.
111 """
112 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
113 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
114
115 pgs = self.fs.mon_manager.get_pool_property(
116 self.fs.get_data_pool_name(),
117 "pg_num"
118 )
119 ops_per_pg = float(ops) / pgs
120 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
121
122 # Test conditions depend on what we're going to be exercising.
123 # * Lift the threshold on whatever throttle we are *not* testing, so
124 # that the throttle of interest is the one that will be the bottleneck
125 # * Create either many small files (test file count throttling) or fewer
126 # large files (test op throttling)
127 if throttle_type == self.OPS_THROTTLE:
128 set_throttles(files=100000000, ops=16)
129 size_unit = 1024 * 1024 # big files, generate lots of ops
130 file_multiplier = 100
131 elif throttle_type == self.FILES_THROTTLE:
132 # The default value of file limit is pretty permissive, so to avoid
133 # the test running too fast, create lots of files and set the limit
134 # pretty low.
135 set_throttles(ops=100000000, files=6)
136 size_unit = 1024 # small, numerous files
137 file_multiplier = 200
138 else:
139 raise NotImplemented(throttle_type)
140
141 # Pick up config changes
142 self.fs.mds_fail_restart()
143 self.fs.wait_for_daemons()
144
145 create_script = dedent("""
146 import os
147
148 mount_path = "{mount_path}"
149 subdir = "delete_me"
150 size_unit = {size_unit}
151 file_multiplier = {file_multiplier}
152 os.mkdir(os.path.join(mount_path, subdir))
153 for i in xrange(0, file_multiplier):
154 for size in xrange(0, {size_range}*size_unit, size_unit):
155 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
156 f = open(os.path.join(mount_path, subdir, filename), 'w')
157 f.write(size * 'x')
158 f.close()
159 """.format(
160 mount_path=self.mount_a.mountpoint,
161 size_unit=size_unit,
162 file_multiplier=file_multiplier,
163 size_range=self.throttle_workload_size_range
164 ))
165
166 self.mount_a.run_python(create_script)
167
168 # We will run the deletion in the background, to reduce the risk of it completing before
169 # we have started monitoring the stray statistics.
170 def background():
171 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
172 self.fs.mds_asok(["flush", "journal"])
173
174 background_thread = gevent.spawn(background)
175
176 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
177 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
178 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
179
180 # During this phase we look for the concurrent ops to exceed half
181 # the limit (a heuristic) and not exceed the limit (a correctness
182 # condition).
183 purge_timeout = 600
184 elapsed = 0
185 files_high_water = 0
186 ops_high_water = 0
187
188 while True:
189 stats = self.fs.mds_asok(['perf', 'dump'])
190 mdc_stats = stats['mds_cache']
191 pq_stats = stats['purge_queue']
192 if elapsed >= purge_timeout:
193 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
194
195 num_strays = mdc_stats['num_strays']
196 num_strays_purging = pq_stats['pq_executing']
197 num_purge_ops = pq_stats['pq_executing_ops']
198
199 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
200
201 files_high_water = max(files_high_water, num_strays_purging)
202 ops_high_water = max(ops_high_water, num_purge_ops)
203
204 total_strays_created = mdc_stats['strays_created']
205 total_strays_purged = pq_stats['pq_executed']
206
207 if total_strays_purged == total_inodes:
208 log.info("Complete purge in {0} seconds".format(elapsed))
209 break
210 elif total_strays_purged > total_inodes:
211 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
212 else:
213 if throttle_type == self.OPS_THROTTLE:
214 # 11 is filer_max_purge_ops plus one for the backtrace:
215 # limit is allowed to be overshot by this much.
216 if num_purge_ops > mds_max_purge_ops + 11:
217 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
218 num_purge_ops, mds_max_purge_ops
219 ))
220 elif throttle_type == self.FILES_THROTTLE:
221 if num_strays_purging > mds_max_purge_files:
222 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
223 num_strays_purging, mds_max_purge_files
224 ))
225 else:
226 raise NotImplemented(throttle_type)
227
228 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
229 num_strays_purging, num_strays,
230 total_strays_purged, total_strays_created
231 ))
232 time.sleep(1)
233 elapsed += 1
234
235 background_thread.join()
236
237 # Check that we got up to a respectable rate during the purge. This is totally
238 # racy, but should be safeish unless the cluster is pathologically slow, or
239 # insanely fast such that the deletions all pass before we have polled the
240 # statistics.
241 if throttle_type == self.OPS_THROTTLE:
242 if ops_high_water < mds_max_purge_ops / 2:
243 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
244 ops_high_water, mds_max_purge_ops
245 ))
246 elif throttle_type == self.FILES_THROTTLE:
247 if files_high_water < mds_max_purge_files / 2:
248 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
249 ops_high_water, mds_max_purge_files
250 ))
251
252 # Sanity check all MDC stray stats
253 stats = self.fs.mds_asok(['perf', 'dump'])
254 mdc_stats = stats['mds_cache']
255 pq_stats = stats['purge_queue']
256 self.assertEqual(mdc_stats['num_strays'], 0)
257 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
258 self.assertEqual(pq_stats['pq_executing'], 0)
259 self.assertEqual(pq_stats['pq_executing_ops'], 0)
260 self.assertEqual(mdc_stats['strays_created'], total_inodes)
261 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
262 self.assertEqual(pq_stats['pq_executed'], total_inodes)
263
264 def get_mdc_stat(self, name, mds_id=None):
265 return self.get_stat("mds_cache", name, mds_id)
266
267 def get_stat(self, subsys, name, mds_id=None):
268 return self.fs.mds_asok(['perf', 'dump', subsys, name],
269 mds_id=mds_id)[subsys][name]
270
271 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
272 mds_id=None):
273 self.wait_until_equal(
274 lambda: self.get_stat(subsys, counter, mds_id),
275 expect_val=expect_val, timeout=timeout,
276 reject_fn=lambda x: x > expect_val
277 )
278
279 def test_open_inode(self):
280 """
281 That the case of a dentry unlinked while a client holds an
282 inode open is handled correctly.
283
284 The inode should be moved into a stray dentry, while the original
285 dentry and directory should be purged.
286
287 The inode's data should be purged when the client eventually closes
288 it.
289 """
290 mount_a_client_id = self.mount_a.get_global_id()
291
292 # Write some bytes to a file
293 size_mb = 8
294
295 # Hold the file open
296 p = self.mount_a.open_background("open_file")
297 self.mount_a.write_n_mb("open_file", size_mb)
298 open_file_ino = self.mount_a.path_to_ino("open_file")
299
300 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
301
302 # Unlink the dentry
303 self.mount_a.run_shell(["rm", "-f", "open_file"])
304
305 # Wait to see the stray count increment
306 self.wait_until_equal(
307 lambda: self.get_mdc_stat("num_strays"),
308 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
309
310 # See that while the stray count has incremented, none have passed
311 # on to the purge queue
312 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
313 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
314
315 # See that the client still holds 2 caps
316 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
317
318 # See that the data objects remain in the data pool
319 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
320
321 # Now close the file
322 self.mount_a.kill_background(p)
323
324 # Wait to see the client cap count decrement
325 self.wait_until_equal(
326 lambda: self.get_session(mount_a_client_id)['num_caps'],
327 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
328 )
329 # Wait to see the purge counter increment, stray count go to zero
330 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
331 self.wait_until_equal(
332 lambda: self.get_mdc_stat("num_strays"),
333 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
334 )
335 self._wait_for_counter("purge_queue", "pq_executed", 1)
336
337 # See that the data objects no longer exist
338 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
339
340 self.await_data_pool_empty()
341
342 def test_hardlink_reintegration(self):
343 """
344 That removal of primary dentry of hardlinked inode results
345 in reintegration of inode into the previously-remote dentry,
346 rather than lingering as a stray indefinitely.
347 """
348 # Write some bytes to file_a
349 size_mb = 8
350 self.mount_a.run_shell(["mkdir", "dir_1"])
351 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
352 ino = self.mount_a.path_to_ino("dir_1/file_a")
353
354 # Create a hardlink named file_b
355 self.mount_a.run_shell(["mkdir", "dir_2"])
356 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
357 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
358
359 # Flush journal
360 self.fs.mds_asok(['flush', 'journal'])
361
362 # See that backtrace for the file points to the file_a path
363 pre_unlink_bt = self.fs.read_backtrace(ino)
364 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
365
366 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
367 self.mount_a.umount_wait()
368 self.fs.mds_asok(['flush', 'journal'])
369 self.fs.mds_fail_restart()
370 self.fs.wait_for_daemons()
371 self.mount_a.mount()
372
373 # Unlink file_a
374 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
375
376 # See that a stray was created
377 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
378 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
379
380 # Wait, see that data objects are still present (i.e. that the
381 # stray did not advance to purging given time)
382 time.sleep(30)
383 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
384 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
385
386 # See that before reintegration, the inode's backtrace points to a stray dir
387 self.fs.mds_asok(['flush', 'journal'])
388 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
389
390 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
391
392 # Do a metadata operation on the remaining link (mv is heavy handed, but
393 # others like touch may be satisfied from caps without poking MDS)
394 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
395
396 # Stray reintegration should happen as a result of the eval_remote call
397 # on responding to a client request.
398 self.wait_until_equal(
399 lambda: self.get_mdc_stat("num_strays"),
400 expect_val=0,
401 timeout=60
402 )
403
404 # See the reintegration counter increment
405 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
406 self.assertGreater(curr_reintegrated, last_reintegrated)
407 last_reintegrated = curr_reintegrated
408
409 # Flush the journal
410 self.fs.mds_asok(['flush', 'journal'])
411
412 # See that the backtrace for the file points to the remaining link's path
413 post_reint_bt = self.fs.read_backtrace(ino)
414 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
415
416 # mds should reintegrates stray when unlink finishes
417 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
418 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
419
420 # Stray reintegration should happen as a result of the notify_stray call
421 # on completion of unlink
422 self.wait_until_equal(
423 lambda: self.get_mdc_stat("num_strays"),
424 expect_val=0,
425 timeout=60
426 )
427
428 # See the reintegration counter increment
429 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
430 self.assertGreater(curr_reintegrated, last_reintegrated)
431 last_reintegrated = curr_reintegrated
432
433 # Flush the journal
434 self.fs.mds_asok(['flush', 'journal'])
435
436 # See that the backtrace for the file points to the newest link's path
437 post_reint_bt = self.fs.read_backtrace(ino)
438 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
439
440 # Now really delete it
441 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
442 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
443 self._wait_for_counter("purge_queue", "pq_executed", 1)
444
445 self.assert_purge_idle()
446 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
447
448 # We caused the inode to go stray 3 times
449 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
450 # We purged it at the last
451 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
452
453 def test_mv_hardlink_cleanup(self):
454 """
455 That when doing a rename from A to B, and B has hardlinks,
456 then we make a stray for B which is then reintegrated
457 into one of his hardlinks.
458 """
459 # Create file_a, file_b, and a hardlink to file_b
460 size_mb = 8
461 self.mount_a.write_n_mb("file_a", size_mb)
462 file_a_ino = self.mount_a.path_to_ino("file_a")
463
464 self.mount_a.write_n_mb("file_b", size_mb)
465 file_b_ino = self.mount_a.path_to_ino("file_b")
466
467 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
468 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
469
470 # mv file_a file_b
471 self.mount_a.run_shell(["mv", "file_a", "file_b"])
472
473 # Stray reintegration should happen as a result of the notify_stray call on
474 # completion of rename
475 self.wait_until_equal(
476 lambda: self.get_mdc_stat("num_strays"),
477 expect_val=0,
478 timeout=60
479 )
480
481 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
482 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
483
484 # No data objects should have been deleted, as both files still have linkage.
485 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
486 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
487
488 self.fs.mds_asok(['flush', 'journal'])
489
490 post_reint_bt = self.fs.read_backtrace(file_b_ino)
491 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
492
493 def _setup_two_ranks(self):
494 # Set up two MDSs
495 self.fs.set_allow_multimds(True)
496 self.fs.set_max_mds(2)
497
498 # See that we have two active MDSs
499 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
500 reject_fn=lambda v: v > 2 or v < 1)
501
502 active_mds_names = self.fs.get_active_names()
503 rank_0_id = active_mds_names[0]
504 rank_1_id = active_mds_names[1]
505 log.info("Ranks 0 and 1 are {0} and {1}".format(
506 rank_0_id, rank_1_id))
507
508 # Get rid of other MDS daemons so that it's easier to know which
509 # daemons to expect in which ranks after restarts
510 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
511 self.mds_cluster.mds_stop(unneeded_mds)
512 self.mds_cluster.mds_fail(unneeded_mds)
513
514 return rank_0_id, rank_1_id
515
516 def _force_migrate(self, to_id, path, watch_ino):
517 """
518 :param to_id: MDS id to move it to
519 :param path: Filesystem path (string) to move
520 :param watch_ino: Inode number to look for at destination to confirm move
521 :return: None
522 """
523 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
524
525 # Poll the MDS cache dump to watch for the export completing
526 migrated = False
527 migrate_timeout = 60
528 migrate_elapsed = 0
529 while not migrated:
530 data = self.fs.mds_asok(["dump", "cache"], to_id)
531 for inode_data in data:
532 if inode_data['ino'] == watch_ino:
533 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
534 if inode_data['is_auth'] is True:
535 migrated = True
536 break
537
538 if not migrated:
539 if migrate_elapsed > migrate_timeout:
540 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
541 else:
542 migrate_elapsed += 1
543 time.sleep(1)
544
545 def _is_stopped(self, rank):
546 mds_map = self.fs.get_mds_map()
547 return rank not in [i['rank'] for i in mds_map['info'].values()]
548
549 def test_purge_on_shutdown(self):
550 """
551 That when an MDS rank is shut down, its purge queue is
552 drained in the process.
553 """
554 rank_0_id, rank_1_id = self._setup_two_ranks()
555
556 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
557 self.mds_cluster.mds_fail_restart(rank_1_id)
558 self.fs.wait_for_daemons()
559
560 file_count = 5
561
562 self.mount_a.create_n_files("delete_me/file", file_count)
563
564 self._force_migrate(rank_1_id, "delete_me",
565 self.mount_a.path_to_ino("delete_me/file_0"))
566
567 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
568 self.mount_a.umount_wait()
569
570 # See all the strays go into purge queue
571 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
572 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
573 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
574
575 # See nothing get purged from the purge queue (yet)
576 time.sleep(10)
577 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
578
579 # Shut down rank 1
580 self.fs.set_max_mds(1)
581 self.fs.deactivate(1)
582
583 # It shouldn't proceed past stopping because its still not allowed
584 # to purge
585 time.sleep(10)
586 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
587 self.assertFalse(self._is_stopped(1))
588
589 # Permit the daemon to start purging again
590 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
591 'injectargs',
592 "--mds_max_purge_files 100")
593
594 # It should now proceed through shutdown
595 self.wait_until_true(
596 lambda: self._is_stopped(1),
597 timeout=60
598 )
599
600 # ...and in the process purge all that data
601 self.await_data_pool_empty()
602
603 def test_migration_on_shutdown(self):
604 """
605 That when an MDS rank is shut down, any non-purgeable strays
606 get migrated to another rank.
607 """
608
609 rank_0_id, rank_1_id = self._setup_two_ranks()
610
611 # Create a non-purgeable stray in a ~mds1 stray directory
612 # by doing a hard link and deleting the original file
613 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
614 self.mount_a.run_shell(["touch", "dir_1/original"])
615 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
616
617 self._force_migrate(rank_1_id, "dir_1",
618 self.mount_a.path_to_ino("dir_1/original"))
619
620 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
621 self.mount_a.umount_wait()
622 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
623 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
624 self.fs.mds_fail_restart()
625 self.fs.wait_for_daemons()
626
627 active_mds_names = self.fs.get_active_names()
628 rank_0_id = active_mds_names[0]
629 rank_1_id = active_mds_names[1]
630
631 self.mount_a.mount()
632
633 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
634 self.mount_a.umount_wait()
635
636 self._wait_for_counter("mds_cache", "strays_created", 1,
637 mds_id=rank_1_id)
638
639 # Shut down rank 1
640 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1")
641 self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1")
642
643 # Wait til we get to a single active MDS mdsmap state
644 self.wait_until_true(lambda: self._is_stopped(1), timeout=120)
645
646 # See that the stray counter on rank 0 has incremented
647 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
648
649 def assert_backtrace(self, ino, expected_path):
650 """
651 Assert that the backtrace in the data pool for an inode matches
652 an expected /foo/bar path.
653 """
654 expected_elements = expected_path.strip("/").split("/")
655 bt = self.fs.read_backtrace(ino)
656 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
657 self.assertListEqual(expected_elements, actual_elements)
658
659 def get_backtrace_path(self, ino):
660 bt = self.fs.read_backtrace(ino)
661 elements = reversed([dn['dname'] for dn in bt['ancestors']])
662 return "/".join(elements)
663
664 def assert_purge_idle(self):
665 """
666 Assert that the MDS perf counters indicate no strays exist and
667 no ongoing purge activity. Sanity check for when PurgeQueue should
668 be idle.
669 """
670 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
671 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
672 self.assertEqual(mdc_stats["num_strays"], 0)
673 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
674 self.assertEqual(pq_stats["pq_executing"], 0)
675 self.assertEqual(pq_stats["pq_executing_ops"], 0)
676
677 def test_mv_cleanup(self):
678 """
679 That when doing a rename from A to B, and B has no hardlinks,
680 then we make a stray for B and purge him.
681 """
682 # Create file_a and file_b, write some to both
683 size_mb = 8
684 self.mount_a.write_n_mb("file_a", size_mb)
685 file_a_ino = self.mount_a.path_to_ino("file_a")
686 self.mount_a.write_n_mb("file_b", size_mb)
687 file_b_ino = self.mount_a.path_to_ino("file_b")
688
689 self.fs.mds_asok(['flush', 'journal'])
690 self.assert_backtrace(file_a_ino, "file_a")
691 self.assert_backtrace(file_b_ino, "file_b")
692
693 # mv file_a file_b
694 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
695
696 # See that stray counter increments
697 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
698 # Wait for purge counter to increment
699 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
700 self._wait_for_counter("purge_queue", "pq_executed", 1)
701
702 self.assert_purge_idle()
703
704 # file_b should have been purged
705 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
706
707 # Backtrace should have updated from file_a to file_b
708 self.fs.mds_asok(['flush', 'journal'])
709 self.assert_backtrace(file_a_ino, "file_b")
710
711 # file_a's data should still exist
712 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
713
714 def _pool_df(self, pool_name):
715 """
716 Return a dict like
717 {
718 "kb_used": 0,
719 "bytes_used": 0,
720 "max_avail": 19630292406,
721 "objects": 0
722 }
723
724 :param pool_name: Which pool (must exist)
725 """
726 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
727 for p in json.loads(out)['pools']:
728 if p['name'] == pool_name:
729 return p['stats']
730
731 raise RuntimeError("Pool '{0}' not found".format(pool_name))
732
733 def await_data_pool_empty(self):
734 self.wait_until_true(
735 lambda: self._pool_df(
736 self.fs.get_data_pool_name()
737 )['objects'] == 0,
738 timeout=60)
739
740 def test_snapshot_remove(self):
741 """
742 That removal of a snapshot that references a now-unlinked file results
743 in purging on the stray for the file.
744 """
745 # Enable snapshots
746 self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true",
747 "--yes-i-really-mean-it")
748
749 # Create a dir with a file in it
750 size_mb = 8
751 self.mount_a.run_shell(["mkdir", "snapdir"])
752 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
753 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
754 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
755
756 # Snapshot the dir
757 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
758
759 # Cause the head revision to deviate from the snapshot
760 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
761
762 # Flush the journal so that backtraces, dirfrag objects will actually be written
763 self.fs.mds_asok(["flush", "journal"])
764
765 # Unlink the file
766 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
767 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
768
769 # Unmount the client because when I come back to check the data is still
770 # in the file I don't want to just see what's in the page cache.
771 self.mount_a.umount_wait()
772
773 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
774
775 # FIXME: at this stage we see a purge and the stray count drops to
776 # zero, but there's actually still a stray, so at the very
777 # least the StrayManager stats code is slightly off
778
779 self.mount_a.mount()
780
781 # See that the data from the snapshotted revision of the file is still present
782 # and correct
783 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
784
785 # Remove the snapshot
786 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
787 self.mount_a.umount_wait()
788
789 # Purging file_a doesn't happen until after we've flushed the journal, because
790 # it is referenced by the snapshotted subdir, and the snapshot isn't really
791 # gone until the journal references to it are gone
792 self.fs.mds_asok(["flush", "journal"])
793
794 # See that a purge happens now
795 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
796 self._wait_for_counter("purge_queue", "pq_executed", 2)
797
798 self.assertTrue(self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024))
799 self.await_data_pool_empty()
800
801 def test_fancy_layout(self):
802 """
803 purge stray file with fancy layout
804 """
805
806 file_name = "fancy_layout_file"
807 self.mount_a.run_shell(["touch", file_name])
808
809 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
810 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
811
812 # 35MB requires 7 objects
813 size_mb = 35
814 self.mount_a.write_n_mb(file_name, size_mb)
815
816 self.mount_a.run_shell(["rm", "-f", file_name])
817 self.fs.mds_asok(["flush", "journal"])
818
819 # can't use self.fs.data_objects_absent here, it does not support fancy layout
820 self.await_data_pool_empty()
821
822 def test_dirfrag_limit(self):
823 """
824 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
825
826 That fragmentation (forced) will allow more entries to be created.
827
828 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
829 """
830
831 self.fs.set_allow_dirfrags(True)
832
833 LOW_LIMIT = 50
834 for mds in self.fs.get_daemon_names():
835 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
836
837 try:
838 self.mount_a.run_python(dedent("""
839 import os
840 path = os.path.join("{path}", "subdir")
841 os.mkdir(path)
842 for n in range(0, {file_count}):
843 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
844 """.format(
845 path=self.mount_a.mountpoint,
846 file_count=LOW_LIMIT+1
847 )))
848 except CommandFailedError:
849 pass # ENOSPAC
850 else:
851 raise RuntimeError("fragment size exceeded")
852
853 # Now test that we can go beyond the limit if we fragment the directory
854
855 self.mount_a.run_python(dedent("""
856 import os
857 path = os.path.join("{path}", "subdir2")
858 os.mkdir(path)
859 for n in range(0, {file_count}):
860 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
861 dfd = os.open(path, os.O_DIRECTORY)
862 os.fsync(dfd)
863 """.format(
864 path=self.mount_a.mountpoint,
865 file_count=LOW_LIMIT
866 )))
867
868 # Ensure that subdir2 is fragmented
869 mds_id = self.fs.get_active_names()[0]
870 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
871
872 # remount+flush (release client caps)
873 self.mount_a.umount_wait()
874 self.fs.mds_asok(["flush", "journal"], mds_id)
875 self.mount_a.mount()
876 self.mount_a.wait_until_mounted()
877
878 # Create 50% more files than the current fragment limit
879 self.mount_a.run_python(dedent("""
880 import os
881 path = os.path.join("{path}", "subdir2")
882 for n in range({file_count}, ({file_count}*3)//2):
883 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
884 """.format(
885 path=self.mount_a.mountpoint,
886 file_count=LOW_LIMIT
887 )))
888
889 # Now test the stray directory size is limited and recovers
890 strays_before = self.get_mdc_stat("strays_created")
891 try:
892 self.mount_a.run_python(dedent("""
893 import os
894 path = os.path.join("{path}", "subdir3")
895 os.mkdir(path)
896 for n in range({file_count}):
897 fpath = os.path.join(path, "%s" % n)
898 f = open(fpath, 'w')
899 f.write("%s" % n)
900 f.close()
901 os.unlink(fpath)
902 """.format(
903 path=self.mount_a.mountpoint,
904 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
905 )))
906 except CommandFailedError:
907 pass # ENOSPAC
908 else:
909 raise RuntimeError("fragment size exceeded")
910
911 strays_after = self.get_mdc_stat("strays_created")
912 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
913
914 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
915 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
916
917 self.mount_a.run_python(dedent("""
918 import os
919 path = os.path.join("{path}", "subdir4")
920 os.mkdir(path)
921 for n in range({file_count}):
922 fpath = os.path.join(path, "%s" % n)
923 f = open(fpath, 'w')
924 f.write("%s" % n)
925 f.close()
926 os.unlink(fpath)
927 """.format(
928 path=self.mount_a.mountpoint,
929 file_count=LOW_LIMIT
930 )))
931
932 def test_purge_queue_upgrade(self):
933 """
934 That when starting on a system with no purge queue in the metadata
935 pool, we silently create one.
936 :return:
937 """
938
939 self.mds_cluster.mds_stop()
940 self.mds_cluster.mds_fail()
941 self.fs.rados(["rm", "500.00000000"])
942 self.mds_cluster.mds_restart()
943 self.fs.wait_for_daemons()
944
945 def test_purge_queue_op_rate(self):
946 """
947 A busy purge queue is meant to aggregate operations sufficiently
948 that our RADOS ops to the metadata pool are not O(files). Check
949 that that is so.
950 :return:
951 """
952
953 # For low rates of deletion, the rate of metadata ops actually
954 # will be o(files), so to see the desired behaviour we have to give
955 # the system a significant quantity, i.e. an order of magnitude
956 # more than the number of files it will purge at one time.
957
958 max_purge_files = 2
959
960 self.set_conf('mds', 'mds_bal_frag', 'false')
961 self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files)
962 self.fs.mds_fail_restart()
963 self.fs.wait_for_daemons()
964
965 phase_1_files = 256
966 phase_2_files = 512
967
968 self.mount_a.run_shell(["mkdir", "phase1"])
969 self.mount_a.create_n_files("phase1/file", phase_1_files)
970
971 self.mount_a.run_shell(["mkdir", "phase2"])
972 self.mount_a.create_n_files("phase2/file", phase_2_files)
973
974 def unlink_and_count_ops(path, expected_deletions):
975 initial_ops = self.get_stat("objecter", "op")
976 initial_pq_executed = self.get_stat("purge_queue", "pq_executed")
977
978 self.mount_a.run_shell(["rm", "-rf", path])
979
980 self._wait_for_counter(
981 "purge_queue", "pq_executed", initial_pq_executed + expected_deletions
982 )
983
984 final_ops = self.get_stat("objecter", "op")
985
986 # Calculation of the *overhead* operations, i.e. do not include
987 # the operations where we actually delete files.
988 return final_ops - initial_ops - expected_deletions
989
990 self.fs.mds_asok(['flush', 'journal'])
991 phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1)
992
993 self.fs.mds_asok(['flush', 'journal'])
994 phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1)
995
996 log.info("Phase 1: {0}".format(phase1_ops))
997 log.info("Phase 2: {0}".format(phase2_ops))
998
999 # The success criterion is that deleting double the number
1000 # of files doesn't generate double the number of overhead ops
1001 # -- this comparison is a rough approximation of that rule.
1002 self.assertTrue(phase2_ops < phase1_ops * 1.25)
1003
1004 # Finally, check that our activity did include properly quiescing
1005 # the queue (i.e. call to Journaler::write_head in the right place),
1006 # by restarting the MDS and checking that it doesn't try re-executing
1007 # any of the work we did.
1008 self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays
1009 # hanging around
1010 self.fs.mds_fail_restart()
1011 self.fs.wait_for_daemons()
1012 time.sleep(10)
1013 self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0)