]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_strays.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / qa / tasks / cephfs / test_strays.py
1 import json
2 import time
3 import logging
4 from textwrap import dedent
5 import datetime
6 import gevent
7 import datetime
8
9 from teuthology.orchestra.run import CommandFailedError, Raw
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
11
12 log = logging.getLogger(__name__)
13
14
15 class TestStrays(CephFSTestCase):
16 MDSS_REQUIRED = 2
17
18 OPS_THROTTLE = 1
19 FILES_THROTTLE = 2
20
21 # Range of different file sizes used in throttle test's workload
22 throttle_workload_size_range = 16
23
24 @for_teuthology
25 def test_ops_throttle(self):
26 self._test_throttling(self.OPS_THROTTLE)
27
28 @for_teuthology
29 def test_files_throttle(self):
30 self._test_throttling(self.FILES_THROTTLE)
31
32 def test_dir_deletion(self):
33 """
34 That when deleting a bunch of dentries and the containing
35 directory, everything gets purged.
36 Catches cases where the client might e.g. fail to trim
37 the unlinked dir from its cache.
38 """
39 file_count = 1000
40 create_script = dedent("""
41 import os
42
43 mount_path = "{mount_path}"
44 subdir = "delete_me"
45 size = {size}
46 file_count = {file_count}
47 os.mkdir(os.path.join(mount_path, subdir))
48 for i in xrange(0, file_count):
49 filename = "{{0}}_{{1}}.bin".format(i, size)
50 f = open(os.path.join(mount_path, subdir, filename), 'w')
51 f.write(size * 'x')
52 f.close()
53 """.format(
54 mount_path=self.mount_a.mountpoint,
55 size=1024,
56 file_count=file_count
57 ))
58
59 self.mount_a.run_python(create_script)
60
61 # That the dirfrag object is created
62 self.fs.mds_asok(["flush", "journal"])
63 dir_ino = self.mount_a.path_to_ino("delete_me")
64 self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0))
65
66 # Remove everything
67 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
68 self.fs.mds_asok(["flush", "journal"])
69
70 # That all the removed files get created as strays
71 strays = self.get_mdc_stat("strays_created")
72 self.assertEqual(strays, file_count + 1)
73
74 # That the strays all get enqueued for purge
75 self.wait_until_equal(
76 lambda: self.get_mdc_stat("strays_enqueued"),
77 strays,
78 timeout=600
79
80 )
81
82 # That all the purge operations execute
83 self.wait_until_equal(
84 lambda: self.get_stat("purge_queue", "pq_executed"),
85 strays,
86 timeout=600
87 )
88
89 # That finally, the directory metadata object is gone
90 self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0))
91
92 # That finally, the data objects are all gone
93 self.await_data_pool_empty()
94
95 def _test_throttling(self, throttle_type):
96 self.data_log = []
97 try:
98 return self._do_test_throttling(throttle_type)
99 except:
100 for l in self.data_log:
101 log.info(",".join([l_.__str__() for l_ in l]))
102 raise
103
104 def _do_test_throttling(self, throttle_type):
105 """
106 That the mds_max_purge_ops setting is respected
107 """
108
109 def set_throttles(files, ops):
110 """
111 Helper for updating ops/files limits, and calculating effective
112 ops_per_pg setting to give the same ops limit.
113 """
114 self.set_conf('mds', 'mds_max_purge_files', "%d" % files)
115 self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops)
116
117 pgs = self.fs.mon_manager.get_pool_property(
118 self.fs.get_data_pool_name(),
119 "pg_num"
120 )
121 ops_per_pg = float(ops) / pgs
122 self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg)
123
124 # Test conditions depend on what we're going to be exercising.
125 # * Lift the threshold on whatever throttle we are *not* testing, so
126 # that the throttle of interest is the one that will be the bottleneck
127 # * Create either many small files (test file count throttling) or fewer
128 # large files (test op throttling)
129 if throttle_type == self.OPS_THROTTLE:
130 set_throttles(files=100000000, ops=16)
131 size_unit = 1024 * 1024 # big files, generate lots of ops
132 file_multiplier = 100
133 elif throttle_type == self.FILES_THROTTLE:
134 # The default value of file limit is pretty permissive, so to avoid
135 # the test running too fast, create lots of files and set the limit
136 # pretty low.
137 set_throttles(ops=100000000, files=6)
138 size_unit = 1024 # small, numerous files
139 file_multiplier = 200
140 else:
141 raise NotImplemented(throttle_type)
142
143 # Pick up config changes
144 self.fs.mds_fail_restart()
145 self.fs.wait_for_daemons()
146
147 create_script = dedent("""
148 import os
149
150 mount_path = "{mount_path}"
151 subdir = "delete_me"
152 size_unit = {size_unit}
153 file_multiplier = {file_multiplier}
154 os.mkdir(os.path.join(mount_path, subdir))
155 for i in xrange(0, file_multiplier):
156 for size in xrange(0, {size_range}*size_unit, size_unit):
157 filename = "{{0}}_{{1}}.bin".format(i, size / size_unit)
158 f = open(os.path.join(mount_path, subdir, filename), 'w')
159 f.write(size * 'x')
160 f.close()
161 """.format(
162 mount_path=self.mount_a.mountpoint,
163 size_unit=size_unit,
164 file_multiplier=file_multiplier,
165 size_range=self.throttle_workload_size_range
166 ))
167
168 self.mount_a.run_python(create_script)
169
170 # We will run the deletion in the background, to reduce the risk of it completing before
171 # we have started monitoring the stray statistics.
172 def background():
173 self.mount_a.run_shell(["rm", "-rf", "delete_me"])
174 self.fs.mds_asok(["flush", "journal"])
175
176 background_thread = gevent.spawn(background)
177
178 total_inodes = file_multiplier * self.throttle_workload_size_range + 1
179 mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds'))
180 mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds'))
181
182 # During this phase we look for the concurrent ops to exceed half
183 # the limit (a heuristic) and not exceed the limit (a correctness
184 # condition).
185 purge_timeout = 600
186 elapsed = 0
187 files_high_water = 0
188 ops_high_water = 0
189
190 while True:
191 stats = self.fs.mds_asok(['perf', 'dump'])
192 mdc_stats = stats['mds_cache']
193 pq_stats = stats['purge_queue']
194 if elapsed >= purge_timeout:
195 raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats))
196
197 num_strays = mdc_stats['num_strays']
198 num_strays_purging = pq_stats['pq_executing']
199 num_purge_ops = pq_stats['pq_executing_ops']
200
201 self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops])
202
203 files_high_water = max(files_high_water, num_strays_purging)
204 ops_high_water = max(ops_high_water, num_purge_ops)
205
206 total_strays_created = mdc_stats['strays_created']
207 total_strays_purged = pq_stats['pq_executed']
208
209 if total_strays_purged == total_inodes:
210 log.info("Complete purge in {0} seconds".format(elapsed))
211 break
212 elif total_strays_purged > total_inodes:
213 raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats))
214 else:
215 if throttle_type == self.OPS_THROTTLE:
216 # 11 is filer_max_purge_ops plus one for the backtrace:
217 # limit is allowed to be overshot by this much.
218 if num_purge_ops > mds_max_purge_ops + 11:
219 raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format(
220 num_purge_ops, mds_max_purge_ops
221 ))
222 elif throttle_type == self.FILES_THROTTLE:
223 if num_strays_purging > mds_max_purge_files:
224 raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format(
225 num_strays_purging, mds_max_purge_files
226 ))
227 else:
228 raise NotImplemented(throttle_type)
229
230 log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format(
231 num_strays_purging, num_strays,
232 total_strays_purged, total_strays_created
233 ))
234 time.sleep(1)
235 elapsed += 1
236
237 background_thread.join()
238
239 # Check that we got up to a respectable rate during the purge. This is totally
240 # racy, but should be safeish unless the cluster is pathologically slow, or
241 # insanely fast such that the deletions all pass before we have polled the
242 # statistics.
243 if throttle_type == self.OPS_THROTTLE:
244 if ops_high_water < mds_max_purge_ops / 2:
245 raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format(
246 ops_high_water, mds_max_purge_ops
247 ))
248 elif throttle_type == self.FILES_THROTTLE:
249 if files_high_water < mds_max_purge_files / 2:
250 raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format(
251 ops_high_water, mds_max_purge_files
252 ))
253
254 # Sanity check all MDC stray stats
255 stats = self.fs.mds_asok(['perf', 'dump'])
256 mdc_stats = stats['mds_cache']
257 pq_stats = stats['purge_queue']
258 self.assertEqual(mdc_stats['num_strays'], 0)
259 self.assertEqual(mdc_stats['num_strays_delayed'], 0)
260 self.assertEqual(pq_stats['pq_executing'], 0)
261 self.assertEqual(pq_stats['pq_executing_ops'], 0)
262 self.assertEqual(mdc_stats['strays_created'], total_inodes)
263 self.assertEqual(mdc_stats['strays_enqueued'], total_inodes)
264 self.assertEqual(pq_stats['pq_executed'], total_inodes)
265
266 def get_mdc_stat(self, name, mds_id=None):
267 return self.get_stat("mds_cache", name, mds_id)
268
269 def get_stat(self, subsys, name, mds_id=None):
270 return self.fs.mds_asok(['perf', 'dump', subsys, name],
271 mds_id=mds_id)[subsys][name]
272
273 def _wait_for_counter(self, subsys, counter, expect_val, timeout=60,
274 mds_id=None):
275 self.wait_until_equal(
276 lambda: self.get_stat(subsys, counter, mds_id),
277 expect_val=expect_val, timeout=timeout,
278 reject_fn=lambda x: x > expect_val
279 )
280
281 def test_open_inode(self):
282 """
283 That the case of a dentry unlinked while a client holds an
284 inode open is handled correctly.
285
286 The inode should be moved into a stray dentry, while the original
287 dentry and directory should be purged.
288
289 The inode's data should be purged when the client eventually closes
290 it.
291 """
292 mount_a_client_id = self.mount_a.get_global_id()
293
294 # Write some bytes to a file
295 size_mb = 8
296
297 # Hold the file open
298 p = self.mount_a.open_background("open_file")
299 self.mount_a.write_n_mb("open_file", size_mb)
300 open_file_ino = self.mount_a.path_to_ino("open_file")
301
302 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
303
304 # Unlink the dentry
305 self.mount_a.run_shell(["rm", "-f", "open_file"])
306
307 # Wait to see the stray count increment
308 self.wait_until_equal(
309 lambda: self.get_mdc_stat("num_strays"),
310 expect_val=1, timeout=60, reject_fn=lambda x: x > 1)
311
312 # See that while the stray count has incremented, none have passed
313 # on to the purge queue
314 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
315 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
316
317 # See that the client still holds 2 caps
318 self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2)
319
320 # See that the data objects remain in the data pool
321 self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024))
322
323 # Now close the file
324 self.mount_a.kill_background(p)
325
326 # Wait to see the client cap count decrement
327 self.wait_until_equal(
328 lambda: self.get_session(mount_a_client_id)['num_caps'],
329 expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1
330 )
331 # Wait to see the purge counter increment, stray count go to zero
332 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
333 self.wait_until_equal(
334 lambda: self.get_mdc_stat("num_strays"),
335 expect_val=0, timeout=6, reject_fn=lambda x: x > 1
336 )
337 self._wait_for_counter("purge_queue", "pq_executed", 1)
338
339 # See that the data objects no longer exist
340 self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024))
341
342 self.await_data_pool_empty()
343
344 def test_hardlink_reintegration(self):
345 """
346 That removal of primary dentry of hardlinked inode results
347 in reintegration of inode into the previously-remote dentry,
348 rather than lingering as a stray indefinitely.
349 """
350 # Write some bytes to file_a
351 size_mb = 8
352 self.mount_a.run_shell(["mkdir", "dir_1"])
353 self.mount_a.write_n_mb("dir_1/file_a", size_mb)
354 ino = self.mount_a.path_to_ino("dir_1/file_a")
355
356 # Create a hardlink named file_b
357 self.mount_a.run_shell(["mkdir", "dir_2"])
358 self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"])
359 self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino)
360
361 # Flush journal
362 self.fs.mds_asok(['flush', 'journal'])
363
364 # See that backtrace for the file points to the file_a path
365 pre_unlink_bt = self.fs.read_backtrace(ino)
366 self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a")
367
368 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
369 self.mount_a.umount_wait()
370 self.fs.mds_asok(['flush', 'journal'])
371 self.fs.mds_fail_restart()
372 self.fs.wait_for_daemons()
373 self.mount_a.mount()
374
375 # Unlink file_a
376 self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"])
377
378 # See that a stray was created
379 self.assertEqual(self.get_mdc_stat("num_strays"), 1)
380 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
381
382 # Wait, see that data objects are still present (i.e. that the
383 # stray did not advance to purging given time)
384 time.sleep(30)
385 self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024))
386 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0)
387
388 # See that before reintegration, the inode's backtrace points to a stray dir
389 self.fs.mds_asok(['flush', 'journal'])
390 self.assertTrue(self.get_backtrace_path(ino).startswith("stray"))
391
392 last_reintegrated = self.get_mdc_stat("strays_reintegrated")
393
394 # Do a metadata operation on the remaining link (mv is heavy handed, but
395 # others like touch may be satisfied from caps without poking MDS)
396 self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"])
397
398 # Stray reintegration should happen as a result of the eval_remote call
399 # on responding to a client request.
400 self.wait_until_equal(
401 lambda: self.get_mdc_stat("num_strays"),
402 expect_val=0,
403 timeout=60
404 )
405
406 # See the reintegration counter increment
407 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
408 self.assertGreater(curr_reintegrated, last_reintegrated)
409 last_reintegrated = curr_reintegrated
410
411 # Flush the journal
412 self.fs.mds_asok(['flush', 'journal'])
413
414 # See that the backtrace for the file points to the remaining link's path
415 post_reint_bt = self.fs.read_backtrace(ino)
416 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c")
417
418 # mds should reintegrates stray when unlink finishes
419 self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"])
420 self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"])
421
422 # Stray reintegration should happen as a result of the notify_stray call
423 # on completion of unlink
424 self.wait_until_equal(
425 lambda: self.get_mdc_stat("num_strays"),
426 expect_val=0,
427 timeout=60
428 )
429
430 # See the reintegration counter increment
431 curr_reintegrated = self.get_mdc_stat("strays_reintegrated")
432 self.assertGreater(curr_reintegrated, last_reintegrated)
433 last_reintegrated = curr_reintegrated
434
435 # Flush the journal
436 self.fs.mds_asok(['flush', 'journal'])
437
438 # See that the backtrace for the file points to the newest link's path
439 post_reint_bt = self.fs.read_backtrace(ino)
440 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d")
441
442 # Now really delete it
443 self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"])
444 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
445 self._wait_for_counter("purge_queue", "pq_executed", 1)
446
447 self.assert_purge_idle()
448 self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024))
449
450 # We caused the inode to go stray 3 times
451 self.assertEqual(self.get_mdc_stat("strays_created"), 3)
452 # We purged it at the last
453 self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1)
454
455 def test_mv_hardlink_cleanup(self):
456 """
457 That when doing a rename from A to B, and B has hardlinks,
458 then we make a stray for B which is then reintegrated
459 into one of his hardlinks.
460 """
461 # Create file_a, file_b, and a hardlink to file_b
462 size_mb = 8
463 self.mount_a.write_n_mb("file_a", size_mb)
464 file_a_ino = self.mount_a.path_to_ino("file_a")
465
466 self.mount_a.write_n_mb("file_b", size_mb)
467 file_b_ino = self.mount_a.path_to_ino("file_b")
468
469 self.mount_a.run_shell(["ln", "file_b", "linkto_b"])
470 self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino)
471
472 # mv file_a file_b
473 self.mount_a.run_shell(["mv", "file_a", "file_b"])
474
475 # Stray reintegration should happen as a result of the notify_stray call on
476 # completion of rename
477 self.wait_until_equal(
478 lambda: self.get_mdc_stat("num_strays"),
479 expect_val=0,
480 timeout=60
481 )
482
483 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
484 self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1)
485
486 # No data objects should have been deleted, as both files still have linkage.
487 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
488 self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024))
489
490 self.fs.mds_asok(['flush', 'journal'])
491
492 post_reint_bt = self.fs.read_backtrace(file_b_ino)
493 self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b")
494
495 def _setup_two_ranks(self):
496 # Set up two MDSs
497 self.fs.set_max_mds(2)
498
499 # See that we have two active MDSs
500 self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30,
501 reject_fn=lambda v: v > 2 or v < 1)
502
503 active_mds_names = self.fs.get_active_names()
504 rank_0_id = active_mds_names[0]
505 rank_1_id = active_mds_names[1]
506 log.info("Ranks 0 and 1 are {0} and {1}".format(
507 rank_0_id, rank_1_id))
508
509 # Get rid of other MDS daemons so that it's easier to know which
510 # daemons to expect in which ranks after restarts
511 for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}:
512 self.mds_cluster.mds_stop(unneeded_mds)
513 self.mds_cluster.mds_fail(unneeded_mds)
514
515 return rank_0_id, rank_1_id
516
517 def _force_migrate(self, to_id, path, watch_ino):
518 """
519 :param to_id: MDS id to move it to
520 :param path: Filesystem path (string) to move
521 :param watch_ino: Inode number to look for at destination to confirm move
522 :return: None
523 """
524 self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path])
525
526 # Poll the MDS cache dump to watch for the export completing
527 migrated = False
528 migrate_timeout = 60
529 migrate_elapsed = 0
530 while not migrated:
531 data = self.fs.mds_asok(["dump", "cache"], to_id)
532 for inode_data in data:
533 if inode_data['ino'] == watch_ino:
534 log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2)))
535 if inode_data['is_auth'] is True:
536 migrated = True
537 break
538
539 if not migrated:
540 if migrate_elapsed > migrate_timeout:
541 raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed))
542 else:
543 migrate_elapsed += 1
544 time.sleep(1)
545
546 def _is_stopped(self, rank):
547 mds_map = self.fs.get_mds_map()
548 return rank not in [i['rank'] for i in mds_map['info'].values()]
549
550 def test_purge_on_shutdown(self):
551 """
552 That when an MDS rank is shut down, its purge queue is
553 drained in the process.
554 """
555 rank_0_id, rank_1_id = self._setup_two_ranks()
556
557 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
558 self.mds_cluster.mds_fail_restart(rank_1_id)
559 self.fs.wait_for_daemons()
560
561 file_count = 5
562
563 self.mount_a.create_n_files("delete_me/file", file_count)
564
565 self._force_migrate(rank_1_id, "delete_me",
566 self.mount_a.path_to_ino("delete_me/file_0"))
567
568 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
569 self.mount_a.umount_wait()
570
571 # See all the strays go into purge queue
572 self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id)
573 self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id)
574 self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0)
575
576 # See nothing get purged from the purge queue (yet)
577 time.sleep(10)
578 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
579
580 # Shut down rank 1
581 self.fs.set_max_mds(1)
582
583 # It shouldn't proceed past stopping because its still not allowed
584 # to purge
585 time.sleep(10)
586 self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0)
587 self.assertFalse(self._is_stopped(1))
588
589 # Permit the daemon to start purging again
590 self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id),
591 'injectargs',
592 "--mds_max_purge_files 100")
593
594 # It should now proceed through shutdown
595 self.fs.wait_for_daemons(timeout=120)
596
597 # ...and in the process purge all that data
598 self.await_data_pool_empty()
599
600 def test_migration_on_shutdown(self):
601 """
602 That when an MDS rank is shut down, any non-purgeable strays
603 get migrated to another rank.
604 """
605
606 rank_0_id, rank_1_id = self._setup_two_ranks()
607
608 # Create a non-purgeable stray in a ~mds1 stray directory
609 # by doing a hard link and deleting the original file
610 self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"])
611 self.mount_a.run_shell(["touch", "dir_1/original"])
612 self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"])
613
614 self._force_migrate(rank_1_id, "dir_1",
615 self.mount_a.path_to_ino("dir_1/original"))
616
617 # empty mds cache. otherwise mds reintegrates stray when unlink finishes
618 self.mount_a.umount_wait()
619 self.fs.mds_asok(['flush', 'journal'], rank_0_id)
620 self.fs.mds_asok(['flush', 'journal'], rank_1_id)
621 self.fs.mds_fail_restart()
622 self.fs.wait_for_daemons()
623
624 active_mds_names = self.fs.get_active_names()
625 rank_0_id = active_mds_names[0]
626 rank_1_id = active_mds_names[1]
627
628 self.mount_a.mount()
629
630 self.mount_a.run_shell(["rm", "-f", "dir_1/original"])
631 self.mount_a.umount_wait()
632
633 self._wait_for_counter("mds_cache", "strays_created", 1,
634 mds_id=rank_1_id)
635
636 # Shut down rank 1
637 self.fs.set_max_mds(1)
638 self.fs.wait_for_daemons(timeout=120)
639
640 # See that the stray counter on rank 0 has incremented
641 self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1)
642
643 def assert_backtrace(self, ino, expected_path):
644 """
645 Assert that the backtrace in the data pool for an inode matches
646 an expected /foo/bar path.
647 """
648 expected_elements = expected_path.strip("/").split("/")
649 bt = self.fs.read_backtrace(ino)
650 actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']]))
651 self.assertListEqual(expected_elements, actual_elements)
652
653 def get_backtrace_path(self, ino):
654 bt = self.fs.read_backtrace(ino)
655 elements = reversed([dn['dname'] for dn in bt['ancestors']])
656 return "/".join(elements)
657
658 def assert_purge_idle(self):
659 """
660 Assert that the MDS perf counters indicate no strays exist and
661 no ongoing purge activity. Sanity check for when PurgeQueue should
662 be idle.
663 """
664 mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache']
665 pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue']
666 self.assertEqual(mdc_stats["num_strays"], 0)
667 self.assertEqual(mdc_stats["num_strays_delayed"], 0)
668 self.assertEqual(pq_stats["pq_executing"], 0)
669 self.assertEqual(pq_stats["pq_executing_ops"], 0)
670
671 def test_mv_cleanup(self):
672 """
673 That when doing a rename from A to B, and B has no hardlinks,
674 then we make a stray for B and purge him.
675 """
676 # Create file_a and file_b, write some to both
677 size_mb = 8
678 self.mount_a.write_n_mb("file_a", size_mb)
679 file_a_ino = self.mount_a.path_to_ino("file_a")
680 self.mount_a.write_n_mb("file_b", size_mb)
681 file_b_ino = self.mount_a.path_to_ino("file_b")
682
683 self.fs.mds_asok(['flush', 'journal'])
684 self.assert_backtrace(file_a_ino, "file_a")
685 self.assert_backtrace(file_b_ino, "file_b")
686
687 # mv file_a file_b
688 self.mount_a.run_shell(['mv', 'file_a', 'file_b'])
689
690 # See that stray counter increments
691 self.assertEqual(self.get_mdc_stat("strays_created"), 1)
692 # Wait for purge counter to increment
693 self._wait_for_counter("mds_cache", "strays_enqueued", 1)
694 self._wait_for_counter("purge_queue", "pq_executed", 1)
695
696 self.assert_purge_idle()
697
698 # file_b should have been purged
699 self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024))
700
701 # Backtrace should have updated from file_a to file_b
702 self.fs.mds_asok(['flush', 'journal'])
703 self.assert_backtrace(file_a_ino, "file_b")
704
705 # file_a's data should still exist
706 self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024))
707
708 def _pool_df(self, pool_name):
709 """
710 Return a dict like
711 {
712 "kb_used": 0,
713 "bytes_used": 0,
714 "max_avail": 19630292406,
715 "objects": 0
716 }
717
718 :param pool_name: Which pool (must exist)
719 """
720 out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")
721 for p in json.loads(out)['pools']:
722 if p['name'] == pool_name:
723 return p['stats']
724
725 raise RuntimeError("Pool '{0}' not found".format(pool_name))
726
727 def await_data_pool_empty(self):
728 self.wait_until_true(
729 lambda: self._pool_df(
730 self.fs.get_data_pool_name()
731 )['objects'] == 0,
732 timeout=60)
733
734 def test_snapshot_remove(self):
735 """
736 That removal of a snapshot that references a now-unlinked file results
737 in purging on the stray for the file.
738 """
739 # Enable snapshots
740 self.fs.set_allow_new_snaps(True)
741
742 # Create a dir with a file in it
743 size_mb = 8
744 self.mount_a.run_shell(["mkdir", "snapdir"])
745 self.mount_a.run_shell(["mkdir", "snapdir/subdir"])
746 self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024)
747 file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a")
748
749 # Snapshot the dir
750 self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"])
751
752 # Cause the head revision to deviate from the snapshot
753 self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb)
754
755 # Flush the journal so that backtraces, dirfrag objects will actually be written
756 self.fs.mds_asok(["flush", "journal"])
757
758 # Unlink the file
759 self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"])
760 self.mount_a.run_shell(["rmdir", "snapdir/subdir"])
761
762 # Unmount the client because when I come back to check the data is still
763 # in the file I don't want to just see what's in the page cache.
764 self.mount_a.umount_wait()
765
766 self.assertEqual(self.get_mdc_stat("strays_created"), 2)
767
768 # FIXME: at this stage we see a purge and the stray count drops to
769 # zero, but there's actually still a stray, so at the very
770 # least the StrayManager stats code is slightly off
771
772 self.mount_a.mount()
773
774 # See that the data from the snapshotted revision of the file is still present
775 # and correct
776 self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024)
777
778 # Remove the snapshot
779 self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"])
780
781 # Purging file_a doesn't happen until after we've flushed the journal, because
782 # it is referenced by the snapshotted subdir, and the snapshot isn't really
783 # gone until the journal references to it are gone
784 self.fs.mds_asok(["flush", "journal"])
785
786 # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs.
787 # See also: http://tracker.ceph.com/issues/20072
788 self.wait_until_true(
789 lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024),
790 timeout=60
791 )
792
793 # See that a purge happens now
794 self._wait_for_counter("mds_cache", "strays_enqueued", 2)
795 self._wait_for_counter("purge_queue", "pq_executed", 2)
796
797 self.await_data_pool_empty()
798
799 def test_fancy_layout(self):
800 """
801 purge stray file with fancy layout
802 """
803
804 file_name = "fancy_layout_file"
805 self.mount_a.run_shell(["touch", file_name])
806
807 file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608"
808 self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout)
809
810 # 35MB requires 7 objects
811 size_mb = 35
812 self.mount_a.write_n_mb(file_name, size_mb)
813
814 self.mount_a.run_shell(["rm", "-f", file_name])
815 self.fs.mds_asok(["flush", "journal"])
816
817 # can't use self.fs.data_objects_absent here, it does not support fancy layout
818 self.await_data_pool_empty()
819
820 def test_dirfrag_limit(self):
821 """
822 That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations).
823
824 That fragmentation (forced) will allow more entries to be created.
825
826 That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged.
827 """
828
829 LOW_LIMIT = 50
830 for mds in self.fs.get_daemon_names():
831 self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds)
832
833 try:
834 self.mount_a.run_python(dedent("""
835 import os
836 path = os.path.join("{path}", "subdir")
837 os.mkdir(path)
838 for n in range(0, {file_count}):
839 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
840 """.format(
841 path=self.mount_a.mountpoint,
842 file_count=LOW_LIMIT+1
843 )))
844 except CommandFailedError:
845 pass # ENOSPAC
846 else:
847 raise RuntimeError("fragment size exceeded")
848
849 # Now test that we can go beyond the limit if we fragment the directory
850
851 self.mount_a.run_python(dedent("""
852 import os
853 path = os.path.join("{path}", "subdir2")
854 os.mkdir(path)
855 for n in range(0, {file_count}):
856 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
857 dfd = os.open(path, os.O_DIRECTORY)
858 os.fsync(dfd)
859 """.format(
860 path=self.mount_a.mountpoint,
861 file_count=LOW_LIMIT
862 )))
863
864 # Ensure that subdir2 is fragmented
865 mds_id = self.fs.get_active_names()[0]
866 self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id)
867
868 # remount+flush (release client caps)
869 self.mount_a.umount_wait()
870 self.fs.mds_asok(["flush", "journal"], mds_id)
871 self.mount_a.mount()
872 self.mount_a.wait_until_mounted()
873
874 # Create 50% more files than the current fragment limit
875 self.mount_a.run_python(dedent("""
876 import os
877 path = os.path.join("{path}", "subdir2")
878 for n in range({file_count}, ({file_count}*3)//2):
879 open(os.path.join(path, "%s" % n), 'w').write("%s" % n)
880 """.format(
881 path=self.mount_a.mountpoint,
882 file_count=LOW_LIMIT
883 )))
884
885 # Now test the stray directory size is limited and recovers
886 strays_before = self.get_mdc_stat("strays_created")
887 try:
888 self.mount_a.run_python(dedent("""
889 import os
890 path = os.path.join("{path}", "subdir3")
891 os.mkdir(path)
892 for n in range({file_count}):
893 fpath = os.path.join(path, "%s" % n)
894 f = open(fpath, 'w')
895 f.write("%s" % n)
896 f.close()
897 os.unlink(fpath)
898 """.format(
899 path=self.mount_a.mountpoint,
900 file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count
901 )))
902 except CommandFailedError:
903 pass # ENOSPAC
904 else:
905 raise RuntimeError("fragment size exceeded")
906
907 strays_after = self.get_mdc_stat("strays_created")
908 self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT)
909
910 self._wait_for_counter("mds_cache", "strays_enqueued", strays_after)
911 self._wait_for_counter("purge_queue", "pq_executed", strays_after)
912
913 self.mount_a.run_python(dedent("""
914 import os
915 path = os.path.join("{path}", "subdir4")
916 os.mkdir(path)
917 for n in range({file_count}):
918 fpath = os.path.join(path, "%s" % n)
919 f = open(fpath, 'w')
920 f.write("%s" % n)
921 f.close()
922 os.unlink(fpath)
923 """.format(
924 path=self.mount_a.mountpoint,
925 file_count=LOW_LIMIT
926 )))
927
928 def test_purge_queue_upgrade(self):
929 """
930 That when starting on a system with no purge queue in the metadata
931 pool, we silently create one.
932 :return:
933 """
934
935 self.mds_cluster.mds_stop()
936 self.mds_cluster.mds_fail()
937 self.fs.rados(["rm", "500.00000000"])
938 self.mds_cluster.mds_restart()
939 self.fs.wait_for_daemons()
940
941 def test_replicated_delete_speed(self):
942 """
943 That deletions of replicated metadata are not pathologically slow
944 """
945 rank_0_id, rank_1_id = self._setup_two_ranks()
946
947 self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0")
948 self.mds_cluster.mds_fail_restart(rank_1_id)
949 self.fs.wait_for_daemons()
950
951 file_count = 10
952
953 self.mount_a.create_n_files("delete_me/file", file_count)
954
955 self._force_migrate(rank_1_id, "delete_me",
956 self.mount_a.path_to_ino("delete_me/file_0"))
957
958 begin = datetime.datetime.now()
959 self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")])
960 end = datetime.datetime.now()
961
962 # What we're really checking here is that we are completing client
963 # operations immediately rather than delaying until the next tick.
964 tick_period = float(self.fs.get_config("mds_tick_interval",
965 service_type="mds"))
966
967 duration = (end - begin).total_seconds()
968 self.assertLess(duration, (file_count * tick_period) * 0.25)
969