]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import time | |
3 | import logging | |
4 | from textwrap import dedent | |
5 | import datetime | |
6 | import gevent | |
224ce89b WB |
7 | import datetime |
8 | ||
7c673cae FG |
9 | from teuthology.orchestra.run import CommandFailedError, Raw |
10 | from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology | |
11 | ||
12 | log = logging.getLogger(__name__) | |
13 | ||
14 | ||
15 | class TestStrays(CephFSTestCase): | |
16 | MDSS_REQUIRED = 2 | |
17 | ||
18 | OPS_THROTTLE = 1 | |
19 | FILES_THROTTLE = 2 | |
20 | ||
21 | # Range of different file sizes used in throttle test's workload | |
22 | throttle_workload_size_range = 16 | |
23 | ||
24 | @for_teuthology | |
25 | def test_ops_throttle(self): | |
26 | self._test_throttling(self.OPS_THROTTLE) | |
27 | ||
28 | @for_teuthology | |
29 | def test_files_throttle(self): | |
30 | self._test_throttling(self.FILES_THROTTLE) | |
31 | ||
32 | def test_dir_deletion(self): | |
33 | """ | |
34 | That when deleting a bunch of dentries and the containing | |
35 | directory, everything gets purged. | |
36 | Catches cases where the client might e.g. fail to trim | |
37 | the unlinked dir from its cache. | |
38 | """ | |
39 | file_count = 1000 | |
40 | create_script = dedent(""" | |
41 | import os | |
42 | ||
43 | mount_path = "{mount_path}" | |
44 | subdir = "delete_me" | |
45 | size = {size} | |
46 | file_count = {file_count} | |
47 | os.mkdir(os.path.join(mount_path, subdir)) | |
48 | for i in xrange(0, file_count): | |
49 | filename = "{{0}}_{{1}}.bin".format(i, size) | |
50 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
51 | f.write(size * 'x') | |
52 | f.close() | |
53 | """.format( | |
54 | mount_path=self.mount_a.mountpoint, | |
55 | size=1024, | |
56 | file_count=file_count | |
57 | )) | |
58 | ||
59 | self.mount_a.run_python(create_script) | |
60 | ||
61 | # That the dirfrag object is created | |
62 | self.fs.mds_asok(["flush", "journal"]) | |
63 | dir_ino = self.mount_a.path_to_ino("delete_me") | |
64 | self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) | |
65 | ||
66 | # Remove everything | |
67 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
68 | self.fs.mds_asok(["flush", "journal"]) | |
69 | ||
70 | # That all the removed files get created as strays | |
71 | strays = self.get_mdc_stat("strays_created") | |
72 | self.assertEqual(strays, file_count + 1) | |
73 | ||
74 | # That the strays all get enqueued for purge | |
75 | self.wait_until_equal( | |
76 | lambda: self.get_mdc_stat("strays_enqueued"), | |
77 | strays, | |
78 | timeout=600 | |
79 | ||
80 | ) | |
81 | ||
82 | # That all the purge operations execute | |
83 | self.wait_until_equal( | |
84 | lambda: self.get_stat("purge_queue", "pq_executed"), | |
85 | strays, | |
86 | timeout=600 | |
87 | ) | |
88 | ||
89 | # That finally, the directory metadata object is gone | |
90 | self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) | |
91 | ||
92 | # That finally, the data objects are all gone | |
93 | self.await_data_pool_empty() | |
94 | ||
95 | def _test_throttling(self, throttle_type): | |
96 | self.data_log = [] | |
97 | try: | |
98 | return self._do_test_throttling(throttle_type) | |
99 | except: | |
100 | for l in self.data_log: | |
101 | log.info(",".join([l_.__str__() for l_ in l])) | |
102 | raise | |
103 | ||
104 | def _do_test_throttling(self, throttle_type): | |
105 | """ | |
106 | That the mds_max_purge_ops setting is respected | |
107 | """ | |
108 | ||
109 | def set_throttles(files, ops): | |
110 | """ | |
111 | Helper for updating ops/files limits, and calculating effective | |
112 | ops_per_pg setting to give the same ops limit. | |
113 | """ | |
114 | self.set_conf('mds', 'mds_max_purge_files', "%d" % files) | |
115 | self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) | |
116 | ||
117 | pgs = self.fs.mon_manager.get_pool_property( | |
118 | self.fs.get_data_pool_name(), | |
119 | "pg_num" | |
120 | ) | |
121 | ops_per_pg = float(ops) / pgs | |
122 | self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) | |
123 | ||
124 | # Test conditions depend on what we're going to be exercising. | |
125 | # * Lift the threshold on whatever throttle we are *not* testing, so | |
126 | # that the throttle of interest is the one that will be the bottleneck | |
127 | # * Create either many small files (test file count throttling) or fewer | |
128 | # large files (test op throttling) | |
129 | if throttle_type == self.OPS_THROTTLE: | |
130 | set_throttles(files=100000000, ops=16) | |
131 | size_unit = 1024 * 1024 # big files, generate lots of ops | |
132 | file_multiplier = 100 | |
133 | elif throttle_type == self.FILES_THROTTLE: | |
134 | # The default value of file limit is pretty permissive, so to avoid | |
135 | # the test running too fast, create lots of files and set the limit | |
136 | # pretty low. | |
137 | set_throttles(ops=100000000, files=6) | |
138 | size_unit = 1024 # small, numerous files | |
139 | file_multiplier = 200 | |
140 | else: | |
141 | raise NotImplemented(throttle_type) | |
142 | ||
143 | # Pick up config changes | |
144 | self.fs.mds_fail_restart() | |
145 | self.fs.wait_for_daemons() | |
146 | ||
147 | create_script = dedent(""" | |
148 | import os | |
149 | ||
150 | mount_path = "{mount_path}" | |
151 | subdir = "delete_me" | |
152 | size_unit = {size_unit} | |
153 | file_multiplier = {file_multiplier} | |
154 | os.mkdir(os.path.join(mount_path, subdir)) | |
155 | for i in xrange(0, file_multiplier): | |
156 | for size in xrange(0, {size_range}*size_unit, size_unit): | |
157 | filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) | |
158 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
159 | f.write(size * 'x') | |
160 | f.close() | |
161 | """.format( | |
162 | mount_path=self.mount_a.mountpoint, | |
163 | size_unit=size_unit, | |
164 | file_multiplier=file_multiplier, | |
165 | size_range=self.throttle_workload_size_range | |
166 | )) | |
167 | ||
168 | self.mount_a.run_python(create_script) | |
169 | ||
170 | # We will run the deletion in the background, to reduce the risk of it completing before | |
171 | # we have started monitoring the stray statistics. | |
172 | def background(): | |
173 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
174 | self.fs.mds_asok(["flush", "journal"]) | |
175 | ||
176 | background_thread = gevent.spawn(background) | |
177 | ||
178 | total_inodes = file_multiplier * self.throttle_workload_size_range + 1 | |
179 | mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) | |
180 | mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) | |
181 | ||
182 | # During this phase we look for the concurrent ops to exceed half | |
183 | # the limit (a heuristic) and not exceed the limit (a correctness | |
184 | # condition). | |
185 | purge_timeout = 600 | |
186 | elapsed = 0 | |
187 | files_high_water = 0 | |
188 | ops_high_water = 0 | |
189 | ||
190 | while True: | |
191 | stats = self.fs.mds_asok(['perf', 'dump']) | |
192 | mdc_stats = stats['mds_cache'] | |
193 | pq_stats = stats['purge_queue'] | |
194 | if elapsed >= purge_timeout: | |
195 | raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) | |
196 | ||
197 | num_strays = mdc_stats['num_strays'] | |
198 | num_strays_purging = pq_stats['pq_executing'] | |
199 | num_purge_ops = pq_stats['pq_executing_ops'] | |
92f5a8d4 TL |
200 | files_high_water = pq_stats['pq_executing_high_water'] |
201 | ops_high_water = pq_stats['pq_executing_ops_high_water'] | |
7c673cae | 202 | |
92f5a8d4 | 203 | self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops, files_high_water, ops_high_water]) |
7c673cae FG |
204 | |
205 | total_strays_created = mdc_stats['strays_created'] | |
206 | total_strays_purged = pq_stats['pq_executed'] | |
207 | ||
208 | if total_strays_purged == total_inodes: | |
209 | log.info("Complete purge in {0} seconds".format(elapsed)) | |
210 | break | |
211 | elif total_strays_purged > total_inodes: | |
212 | raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) | |
213 | else: | |
214 | if throttle_type == self.OPS_THROTTLE: | |
215 | # 11 is filer_max_purge_ops plus one for the backtrace: | |
216 | # limit is allowed to be overshot by this much. | |
217 | if num_purge_ops > mds_max_purge_ops + 11: | |
218 | raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( | |
219 | num_purge_ops, mds_max_purge_ops | |
220 | )) | |
221 | elif throttle_type == self.FILES_THROTTLE: | |
222 | if num_strays_purging > mds_max_purge_files: | |
223 | raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( | |
224 | num_strays_purging, mds_max_purge_files | |
225 | )) | |
226 | else: | |
227 | raise NotImplemented(throttle_type) | |
228 | ||
229 | log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( | |
230 | num_strays_purging, num_strays, | |
231 | total_strays_purged, total_strays_created | |
232 | )) | |
233 | time.sleep(1) | |
234 | elapsed += 1 | |
235 | ||
236 | background_thread.join() | |
237 | ||
238 | # Check that we got up to a respectable rate during the purge. This is totally | |
239 | # racy, but should be safeish unless the cluster is pathologically slow, or | |
240 | # insanely fast such that the deletions all pass before we have polled the | |
241 | # statistics. | |
242 | if throttle_type == self.OPS_THROTTLE: | |
243 | if ops_high_water < mds_max_purge_ops / 2: | |
244 | raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( | |
245 | ops_high_water, mds_max_purge_ops | |
246 | )) | |
92f5a8d4 TL |
247 | # The MDS may go over mds_max_purge_ops for some items, like a |
248 | # heavily fragmented directory. The throttle does not kick in | |
249 | # until *after* we reach or exceed the limit. This is expected | |
250 | # because we don't want to starve the PQ or never purge a | |
251 | # particularly large file/directory. | |
252 | self.assertLessEqual(ops_high_water, mds_max_purge_ops+64) | |
7c673cae FG |
253 | elif throttle_type == self.FILES_THROTTLE: |
254 | if files_high_water < mds_max_purge_files / 2: | |
255 | raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( | |
92f5a8d4 | 256 | files_high_water, mds_max_purge_files |
7c673cae | 257 | )) |
92f5a8d4 | 258 | self.assertLessEqual(files_high_water, mds_max_purge_files) |
7c673cae FG |
259 | |
260 | # Sanity check all MDC stray stats | |
261 | stats = self.fs.mds_asok(['perf', 'dump']) | |
262 | mdc_stats = stats['mds_cache'] | |
263 | pq_stats = stats['purge_queue'] | |
264 | self.assertEqual(mdc_stats['num_strays'], 0) | |
265 | self.assertEqual(mdc_stats['num_strays_delayed'], 0) | |
266 | self.assertEqual(pq_stats['pq_executing'], 0) | |
267 | self.assertEqual(pq_stats['pq_executing_ops'], 0) | |
268 | self.assertEqual(mdc_stats['strays_created'], total_inodes) | |
269 | self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) | |
270 | self.assertEqual(pq_stats['pq_executed'], total_inodes) | |
271 | ||
272 | def get_mdc_stat(self, name, mds_id=None): | |
273 | return self.get_stat("mds_cache", name, mds_id) | |
274 | ||
275 | def get_stat(self, subsys, name, mds_id=None): | |
276 | return self.fs.mds_asok(['perf', 'dump', subsys, name], | |
277 | mds_id=mds_id)[subsys][name] | |
278 | ||
279 | def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, | |
280 | mds_id=None): | |
281 | self.wait_until_equal( | |
282 | lambda: self.get_stat(subsys, counter, mds_id), | |
283 | expect_val=expect_val, timeout=timeout, | |
284 | reject_fn=lambda x: x > expect_val | |
285 | ) | |
286 | ||
287 | def test_open_inode(self): | |
288 | """ | |
289 | That the case of a dentry unlinked while a client holds an | |
290 | inode open is handled correctly. | |
291 | ||
292 | The inode should be moved into a stray dentry, while the original | |
293 | dentry and directory should be purged. | |
294 | ||
295 | The inode's data should be purged when the client eventually closes | |
296 | it. | |
297 | """ | |
298 | mount_a_client_id = self.mount_a.get_global_id() | |
299 | ||
300 | # Write some bytes to a file | |
301 | size_mb = 8 | |
302 | ||
303 | # Hold the file open | |
304 | p = self.mount_a.open_background("open_file") | |
305 | self.mount_a.write_n_mb("open_file", size_mb) | |
306 | open_file_ino = self.mount_a.path_to_ino("open_file") | |
307 | ||
308 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
309 | ||
310 | # Unlink the dentry | |
311 | self.mount_a.run_shell(["rm", "-f", "open_file"]) | |
312 | ||
313 | # Wait to see the stray count increment | |
314 | self.wait_until_equal( | |
315 | lambda: self.get_mdc_stat("num_strays"), | |
316 | expect_val=1, timeout=60, reject_fn=lambda x: x > 1) | |
317 | ||
318 | # See that while the stray count has incremented, none have passed | |
319 | # on to the purge queue | |
320 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
321 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
322 | ||
323 | # See that the client still holds 2 caps | |
324 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
325 | ||
326 | # See that the data objects remain in the data pool | |
327 | self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) | |
328 | ||
329 | # Now close the file | |
330 | self.mount_a.kill_background(p) | |
331 | ||
332 | # Wait to see the client cap count decrement | |
333 | self.wait_until_equal( | |
334 | lambda: self.get_session(mount_a_client_id)['num_caps'], | |
335 | expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 | |
336 | ) | |
337 | # Wait to see the purge counter increment, stray count go to zero | |
338 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
339 | self.wait_until_equal( | |
340 | lambda: self.get_mdc_stat("num_strays"), | |
341 | expect_val=0, timeout=6, reject_fn=lambda x: x > 1 | |
342 | ) | |
343 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
344 | ||
345 | # See that the data objects no longer exist | |
346 | self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) | |
347 | ||
348 | self.await_data_pool_empty() | |
349 | ||
350 | def test_hardlink_reintegration(self): | |
351 | """ | |
352 | That removal of primary dentry of hardlinked inode results | |
353 | in reintegration of inode into the previously-remote dentry, | |
354 | rather than lingering as a stray indefinitely. | |
355 | """ | |
356 | # Write some bytes to file_a | |
357 | size_mb = 8 | |
31f18b77 FG |
358 | self.mount_a.run_shell(["mkdir", "dir_1"]) |
359 | self.mount_a.write_n_mb("dir_1/file_a", size_mb) | |
360 | ino = self.mount_a.path_to_ino("dir_1/file_a") | |
7c673cae FG |
361 | |
362 | # Create a hardlink named file_b | |
31f18b77 FG |
363 | self.mount_a.run_shell(["mkdir", "dir_2"]) |
364 | self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) | |
365 | self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) | |
7c673cae FG |
366 | |
367 | # Flush journal | |
368 | self.fs.mds_asok(['flush', 'journal']) | |
369 | ||
370 | # See that backtrace for the file points to the file_a path | |
371 | pre_unlink_bt = self.fs.read_backtrace(ino) | |
372 | self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") | |
373 | ||
31f18b77 FG |
374 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes |
375 | self.mount_a.umount_wait() | |
376 | self.fs.mds_asok(['flush', 'journal']) | |
377 | self.fs.mds_fail_restart() | |
378 | self.fs.wait_for_daemons() | |
379 | self.mount_a.mount() | |
380 | ||
7c673cae | 381 | # Unlink file_a |
31f18b77 | 382 | self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) |
7c673cae FG |
383 | |
384 | # See that a stray was created | |
385 | self.assertEqual(self.get_mdc_stat("num_strays"), 1) | |
386 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
387 | ||
388 | # Wait, see that data objects are still present (i.e. that the | |
389 | # stray did not advance to purging given time) | |
390 | time.sleep(30) | |
391 | self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) | |
392 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
393 | ||
394 | # See that before reintegration, the inode's backtrace points to a stray dir | |
395 | self.fs.mds_asok(['flush', 'journal']) | |
396 | self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) | |
397 | ||
31f18b77 FG |
398 | last_reintegrated = self.get_mdc_stat("strays_reintegrated") |
399 | ||
7c673cae FG |
400 | # Do a metadata operation on the remaining link (mv is heavy handed, but |
401 | # others like touch may be satisfied from caps without poking MDS) | |
31f18b77 FG |
402 | self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) |
403 | ||
404 | # Stray reintegration should happen as a result of the eval_remote call | |
405 | # on responding to a client request. | |
406 | self.wait_until_equal( | |
407 | lambda: self.get_mdc_stat("num_strays"), | |
408 | expect_val=0, | |
409 | timeout=60 | |
410 | ) | |
7c673cae FG |
411 | |
412 | # See the reintegration counter increment | |
31f18b77 FG |
413 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") |
414 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
415 | last_reintegrated = curr_reintegrated | |
7c673cae FG |
416 | |
417 | # Flush the journal | |
418 | self.fs.mds_asok(['flush', 'journal']) | |
419 | ||
420 | # See that the backtrace for the file points to the remaining link's path | |
421 | post_reint_bt = self.fs.read_backtrace(ino) | |
422 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") | |
423 | ||
31f18b77 FG |
424 | # mds should reintegrates stray when unlink finishes |
425 | self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) | |
426 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) | |
427 | ||
428 | # Stray reintegration should happen as a result of the notify_stray call | |
429 | # on completion of unlink | |
430 | self.wait_until_equal( | |
431 | lambda: self.get_mdc_stat("num_strays"), | |
432 | expect_val=0, | |
433 | timeout=60 | |
434 | ) | |
435 | ||
436 | # See the reintegration counter increment | |
437 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") | |
438 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
439 | last_reintegrated = curr_reintegrated | |
440 | ||
441 | # Flush the journal | |
442 | self.fs.mds_asok(['flush', 'journal']) | |
443 | ||
444 | # See that the backtrace for the file points to the newest link's path | |
445 | post_reint_bt = self.fs.read_backtrace(ino) | |
446 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") | |
7c673cae FG |
447 | |
448 | # Now really delete it | |
31f18b77 | 449 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) |
7c673cae FG |
450 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) |
451 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
452 | ||
453 | self.assert_purge_idle() | |
454 | self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) | |
455 | ||
31f18b77 FG |
456 | # We caused the inode to go stray 3 times |
457 | self.assertEqual(self.get_mdc_stat("strays_created"), 3) | |
458 | # We purged it at the last | |
7c673cae FG |
459 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) |
460 | ||
461 | def test_mv_hardlink_cleanup(self): | |
462 | """ | |
463 | That when doing a rename from A to B, and B has hardlinks, | |
464 | then we make a stray for B which is then reintegrated | |
465 | into one of his hardlinks. | |
466 | """ | |
467 | # Create file_a, file_b, and a hardlink to file_b | |
468 | size_mb = 8 | |
469 | self.mount_a.write_n_mb("file_a", size_mb) | |
470 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
471 | ||
472 | self.mount_a.write_n_mb("file_b", size_mb) | |
473 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
474 | ||
475 | self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) | |
476 | self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) | |
477 | ||
478 | # mv file_a file_b | |
479 | self.mount_a.run_shell(["mv", "file_a", "file_b"]) | |
480 | ||
31f18b77 FG |
481 | # Stray reintegration should happen as a result of the notify_stray call on |
482 | # completion of rename | |
483 | self.wait_until_equal( | |
484 | lambda: self.get_mdc_stat("num_strays"), | |
485 | expect_val=0, | |
486 | timeout=60 | |
487 | ) | |
7c673cae | 488 | |
7c673cae | 489 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) |
31f18b77 FG |
490 | self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) |
491 | ||
492 | # No data objects should have been deleted, as both files still have linkage. | |
7c673cae FG |
493 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) |
494 | self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) | |
495 | ||
7c673cae FG |
496 | self.fs.mds_asok(['flush', 'journal']) |
497 | ||
498 | post_reint_bt = self.fs.read_backtrace(file_b_ino) | |
31f18b77 | 499 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") |
7c673cae FG |
500 | |
501 | def _setup_two_ranks(self): | |
502 | # Set up two MDSs | |
7c673cae FG |
503 | self.fs.set_max_mds(2) |
504 | ||
505 | # See that we have two active MDSs | |
506 | self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, | |
507 | reject_fn=lambda v: v > 2 or v < 1) | |
508 | ||
509 | active_mds_names = self.fs.get_active_names() | |
510 | rank_0_id = active_mds_names[0] | |
511 | rank_1_id = active_mds_names[1] | |
512 | log.info("Ranks 0 and 1 are {0} and {1}".format( | |
513 | rank_0_id, rank_1_id)) | |
514 | ||
515 | # Get rid of other MDS daemons so that it's easier to know which | |
516 | # daemons to expect in which ranks after restarts | |
517 | for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: | |
518 | self.mds_cluster.mds_stop(unneeded_mds) | |
519 | self.mds_cluster.mds_fail(unneeded_mds) | |
520 | ||
521 | return rank_0_id, rank_1_id | |
522 | ||
31f18b77 | 523 | def _force_migrate(self, to_id, path, watch_ino): |
7c673cae | 524 | """ |
7c673cae FG |
525 | :param to_id: MDS id to move it to |
526 | :param path: Filesystem path (string) to move | |
527 | :param watch_ino: Inode number to look for at destination to confirm move | |
528 | :return: None | |
529 | """ | |
31f18b77 | 530 | self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path]) |
7c673cae FG |
531 | |
532 | # Poll the MDS cache dump to watch for the export completing | |
533 | migrated = False | |
534 | migrate_timeout = 60 | |
535 | migrate_elapsed = 0 | |
536 | while not migrated: | |
537 | data = self.fs.mds_asok(["dump", "cache"], to_id) | |
538 | for inode_data in data: | |
539 | if inode_data['ino'] == watch_ino: | |
540 | log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) | |
541 | if inode_data['is_auth'] is True: | |
542 | migrated = True | |
543 | break | |
544 | ||
545 | if not migrated: | |
546 | if migrate_elapsed > migrate_timeout: | |
547 | raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed)) | |
548 | else: | |
549 | migrate_elapsed += 1 | |
550 | time.sleep(1) | |
551 | ||
552 | def _is_stopped(self, rank): | |
553 | mds_map = self.fs.get_mds_map() | |
554 | return rank not in [i['rank'] for i in mds_map['info'].values()] | |
555 | ||
556 | def test_purge_on_shutdown(self): | |
557 | """ | |
558 | That when an MDS rank is shut down, its purge queue is | |
559 | drained in the process. | |
560 | """ | |
561 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
562 | ||
563 | self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") | |
564 | self.mds_cluster.mds_fail_restart(rank_1_id) | |
565 | self.fs.wait_for_daemons() | |
566 | ||
567 | file_count = 5 | |
568 | ||
569 | self.mount_a.create_n_files("delete_me/file", file_count) | |
570 | ||
31f18b77 | 571 | self._force_migrate(rank_1_id, "delete_me", |
7c673cae FG |
572 | self.mount_a.path_to_ino("delete_me/file_0")) |
573 | ||
574 | self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) | |
575 | self.mount_a.umount_wait() | |
576 | ||
577 | # See all the strays go into purge queue | |
578 | self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) | |
579 | self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) | |
580 | self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) | |
581 | ||
582 | # See nothing get purged from the purge queue (yet) | |
583 | time.sleep(10) | |
584 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
585 | ||
586 | # Shut down rank 1 | |
587 | self.fs.set_max_mds(1) | |
7c673cae FG |
588 | |
589 | # It shouldn't proceed past stopping because its still not allowed | |
590 | # to purge | |
591 | time.sleep(10) | |
592 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
593 | self.assertFalse(self._is_stopped(1)) | |
594 | ||
595 | # Permit the daemon to start purging again | |
596 | self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), | |
597 | 'injectargs', | |
598 | "--mds_max_purge_files 100") | |
599 | ||
600 | # It should now proceed through shutdown | |
11fdf7f2 | 601 | self.fs.wait_for_daemons(timeout=120) |
7c673cae FG |
602 | |
603 | # ...and in the process purge all that data | |
604 | self.await_data_pool_empty() | |
605 | ||
606 | def test_migration_on_shutdown(self): | |
607 | """ | |
608 | That when an MDS rank is shut down, any non-purgeable strays | |
609 | get migrated to another rank. | |
610 | """ | |
611 | ||
612 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
613 | ||
614 | # Create a non-purgeable stray in a ~mds1 stray directory | |
615 | # by doing a hard link and deleting the original file | |
31f18b77 FG |
616 | self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"]) |
617 | self.mount_a.run_shell(["touch", "dir_1/original"]) | |
618 | self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"]) | |
619 | ||
620 | self._force_migrate(rank_1_id, "dir_1", | |
621 | self.mount_a.path_to_ino("dir_1/original")) | |
622 | ||
623 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes | |
624 | self.mount_a.umount_wait() | |
625 | self.fs.mds_asok(['flush', 'journal'], rank_0_id) | |
626 | self.fs.mds_asok(['flush', 'journal'], rank_1_id) | |
627 | self.fs.mds_fail_restart() | |
628 | self.fs.wait_for_daemons() | |
7c673cae | 629 | |
31f18b77 FG |
630 | active_mds_names = self.fs.get_active_names() |
631 | rank_0_id = active_mds_names[0] | |
632 | rank_1_id = active_mds_names[1] | |
633 | ||
634 | self.mount_a.mount() | |
7c673cae | 635 | |
31f18b77 | 636 | self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) |
7c673cae FG |
637 | self.mount_a.umount_wait() |
638 | ||
639 | self._wait_for_counter("mds_cache", "strays_created", 1, | |
640 | mds_id=rank_1_id) | |
641 | ||
642 | # Shut down rank 1 | |
11fdf7f2 TL |
643 | self.fs.set_max_mds(1) |
644 | self.fs.wait_for_daemons(timeout=120) | |
7c673cae FG |
645 | |
646 | # See that the stray counter on rank 0 has incremented | |
647 | self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) | |
648 | ||
649 | def assert_backtrace(self, ino, expected_path): | |
650 | """ | |
651 | Assert that the backtrace in the data pool for an inode matches | |
652 | an expected /foo/bar path. | |
653 | """ | |
654 | expected_elements = expected_path.strip("/").split("/") | |
655 | bt = self.fs.read_backtrace(ino) | |
656 | actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) | |
657 | self.assertListEqual(expected_elements, actual_elements) | |
658 | ||
659 | def get_backtrace_path(self, ino): | |
660 | bt = self.fs.read_backtrace(ino) | |
661 | elements = reversed([dn['dname'] for dn in bt['ancestors']]) | |
662 | return "/".join(elements) | |
663 | ||
664 | def assert_purge_idle(self): | |
665 | """ | |
666 | Assert that the MDS perf counters indicate no strays exist and | |
667 | no ongoing purge activity. Sanity check for when PurgeQueue should | |
668 | be idle. | |
669 | """ | |
670 | mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] | |
671 | pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] | |
672 | self.assertEqual(mdc_stats["num_strays"], 0) | |
673 | self.assertEqual(mdc_stats["num_strays_delayed"], 0) | |
674 | self.assertEqual(pq_stats["pq_executing"], 0) | |
675 | self.assertEqual(pq_stats["pq_executing_ops"], 0) | |
676 | ||
677 | def test_mv_cleanup(self): | |
678 | """ | |
679 | That when doing a rename from A to B, and B has no hardlinks, | |
680 | then we make a stray for B and purge him. | |
681 | """ | |
682 | # Create file_a and file_b, write some to both | |
683 | size_mb = 8 | |
684 | self.mount_a.write_n_mb("file_a", size_mb) | |
685 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
686 | self.mount_a.write_n_mb("file_b", size_mb) | |
687 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
688 | ||
689 | self.fs.mds_asok(['flush', 'journal']) | |
690 | self.assert_backtrace(file_a_ino, "file_a") | |
691 | self.assert_backtrace(file_b_ino, "file_b") | |
692 | ||
693 | # mv file_a file_b | |
694 | self.mount_a.run_shell(['mv', 'file_a', 'file_b']) | |
695 | ||
696 | # See that stray counter increments | |
697 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
698 | # Wait for purge counter to increment | |
699 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
700 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
701 | ||
702 | self.assert_purge_idle() | |
703 | ||
704 | # file_b should have been purged | |
705 | self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) | |
706 | ||
707 | # Backtrace should have updated from file_a to file_b | |
708 | self.fs.mds_asok(['flush', 'journal']) | |
709 | self.assert_backtrace(file_a_ino, "file_b") | |
710 | ||
711 | # file_a's data should still exist | |
712 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) | |
713 | ||
714 | def _pool_df(self, pool_name): | |
715 | """ | |
716 | Return a dict like | |
717 | { | |
718 | "kb_used": 0, | |
719 | "bytes_used": 0, | |
720 | "max_avail": 19630292406, | |
721 | "objects": 0 | |
722 | } | |
723 | ||
724 | :param pool_name: Which pool (must exist) | |
725 | """ | |
726 | out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") | |
727 | for p in json.loads(out)['pools']: | |
728 | if p['name'] == pool_name: | |
729 | return p['stats'] | |
730 | ||
731 | raise RuntimeError("Pool '{0}' not found".format(pool_name)) | |
732 | ||
733 | def await_data_pool_empty(self): | |
734 | self.wait_until_true( | |
735 | lambda: self._pool_df( | |
736 | self.fs.get_data_pool_name() | |
737 | )['objects'] == 0, | |
738 | timeout=60) | |
739 | ||
740 | def test_snapshot_remove(self): | |
741 | """ | |
742 | That removal of a snapshot that references a now-unlinked file results | |
743 | in purging on the stray for the file. | |
744 | """ | |
745 | # Enable snapshots | |
11fdf7f2 | 746 | self.fs.set_allow_new_snaps(True) |
7c673cae FG |
747 | |
748 | # Create a dir with a file in it | |
749 | size_mb = 8 | |
750 | self.mount_a.run_shell(["mkdir", "snapdir"]) | |
751 | self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) | |
752 | self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) | |
753 | file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") | |
754 | ||
755 | # Snapshot the dir | |
756 | self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) | |
757 | ||
758 | # Cause the head revision to deviate from the snapshot | |
759 | self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) | |
760 | ||
761 | # Flush the journal so that backtraces, dirfrag objects will actually be written | |
762 | self.fs.mds_asok(["flush", "journal"]) | |
763 | ||
764 | # Unlink the file | |
765 | self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) | |
766 | self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) | |
767 | ||
768 | # Unmount the client because when I come back to check the data is still | |
769 | # in the file I don't want to just see what's in the page cache. | |
770 | self.mount_a.umount_wait() | |
771 | ||
772 | self.assertEqual(self.get_mdc_stat("strays_created"), 2) | |
773 | ||
774 | # FIXME: at this stage we see a purge and the stray count drops to | |
775 | # zero, but there's actually still a stray, so at the very | |
776 | # least the StrayManager stats code is slightly off | |
777 | ||
778 | self.mount_a.mount() | |
779 | ||
780 | # See that the data from the snapshotted revision of the file is still present | |
781 | # and correct | |
782 | self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) | |
783 | ||
784 | # Remove the snapshot | |
785 | self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) | |
7c673cae FG |
786 | |
787 | # Purging file_a doesn't happen until after we've flushed the journal, because | |
788 | # it is referenced by the snapshotted subdir, and the snapshot isn't really | |
789 | # gone until the journal references to it are gone | |
790 | self.fs.mds_asok(["flush", "journal"]) | |
791 | ||
c07f9fc5 FG |
792 | # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs. |
793 | # See also: http://tracker.ceph.com/issues/20072 | |
794 | self.wait_until_true( | |
795 | lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024), | |
796 | timeout=60 | |
797 | ) | |
798 | ||
7c673cae FG |
799 | # See that a purge happens now |
800 | self._wait_for_counter("mds_cache", "strays_enqueued", 2) | |
801 | self._wait_for_counter("purge_queue", "pq_executed", 2) | |
802 | ||
7c673cae FG |
803 | self.await_data_pool_empty() |
804 | ||
805 | def test_fancy_layout(self): | |
806 | """ | |
807 | purge stray file with fancy layout | |
808 | """ | |
809 | ||
810 | file_name = "fancy_layout_file" | |
811 | self.mount_a.run_shell(["touch", file_name]) | |
812 | ||
813 | file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" | |
814 | self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) | |
815 | ||
816 | # 35MB requires 7 objects | |
817 | size_mb = 35 | |
818 | self.mount_a.write_n_mb(file_name, size_mb) | |
819 | ||
820 | self.mount_a.run_shell(["rm", "-f", file_name]) | |
821 | self.fs.mds_asok(["flush", "journal"]) | |
822 | ||
823 | # can't use self.fs.data_objects_absent here, it does not support fancy layout | |
824 | self.await_data_pool_empty() | |
825 | ||
826 | def test_dirfrag_limit(self): | |
827 | """ | |
828 | That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). | |
829 | ||
830 | That fragmentation (forced) will allow more entries to be created. | |
831 | ||
832 | That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. | |
833 | """ | |
834 | ||
7c673cae FG |
835 | LOW_LIMIT = 50 |
836 | for mds in self.fs.get_daemon_names(): | |
837 | self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds) | |
838 | ||
839 | try: | |
840 | self.mount_a.run_python(dedent(""" | |
841 | import os | |
842 | path = os.path.join("{path}", "subdir") | |
843 | os.mkdir(path) | |
844 | for n in range(0, {file_count}): | |
845 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
846 | """.format( | |
847 | path=self.mount_a.mountpoint, | |
848 | file_count=LOW_LIMIT+1 | |
849 | ))) | |
850 | except CommandFailedError: | |
851 | pass # ENOSPAC | |
852 | else: | |
853 | raise RuntimeError("fragment size exceeded") | |
854 | ||
855 | # Now test that we can go beyond the limit if we fragment the directory | |
856 | ||
857 | self.mount_a.run_python(dedent(""" | |
858 | import os | |
859 | path = os.path.join("{path}", "subdir2") | |
860 | os.mkdir(path) | |
861 | for n in range(0, {file_count}): | |
862 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
863 | dfd = os.open(path, os.O_DIRECTORY) | |
864 | os.fsync(dfd) | |
865 | """.format( | |
866 | path=self.mount_a.mountpoint, | |
867 | file_count=LOW_LIMIT | |
868 | ))) | |
869 | ||
870 | # Ensure that subdir2 is fragmented | |
871 | mds_id = self.fs.get_active_names()[0] | |
872 | self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id) | |
873 | ||
874 | # remount+flush (release client caps) | |
875 | self.mount_a.umount_wait() | |
876 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
877 | self.mount_a.mount() | |
878 | self.mount_a.wait_until_mounted() | |
879 | ||
880 | # Create 50% more files than the current fragment limit | |
881 | self.mount_a.run_python(dedent(""" | |
882 | import os | |
883 | path = os.path.join("{path}", "subdir2") | |
884 | for n in range({file_count}, ({file_count}*3)//2): | |
885 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
886 | """.format( | |
887 | path=self.mount_a.mountpoint, | |
888 | file_count=LOW_LIMIT | |
889 | ))) | |
890 | ||
891 | # Now test the stray directory size is limited and recovers | |
892 | strays_before = self.get_mdc_stat("strays_created") | |
893 | try: | |
894 | self.mount_a.run_python(dedent(""" | |
895 | import os | |
896 | path = os.path.join("{path}", "subdir3") | |
897 | os.mkdir(path) | |
898 | for n in range({file_count}): | |
899 | fpath = os.path.join(path, "%s" % n) | |
900 | f = open(fpath, 'w') | |
901 | f.write("%s" % n) | |
902 | f.close() | |
903 | os.unlink(fpath) | |
904 | """.format( | |
905 | path=self.mount_a.mountpoint, | |
906 | file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count | |
907 | ))) | |
908 | except CommandFailedError: | |
909 | pass # ENOSPAC | |
910 | else: | |
911 | raise RuntimeError("fragment size exceeded") | |
912 | ||
913 | strays_after = self.get_mdc_stat("strays_created") | |
914 | self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) | |
915 | ||
916 | self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) | |
917 | self._wait_for_counter("purge_queue", "pq_executed", strays_after) | |
918 | ||
919 | self.mount_a.run_python(dedent(""" | |
920 | import os | |
921 | path = os.path.join("{path}", "subdir4") | |
922 | os.mkdir(path) | |
923 | for n in range({file_count}): | |
924 | fpath = os.path.join(path, "%s" % n) | |
925 | f = open(fpath, 'w') | |
926 | f.write("%s" % n) | |
927 | f.close() | |
928 | os.unlink(fpath) | |
929 | """.format( | |
930 | path=self.mount_a.mountpoint, | |
931 | file_count=LOW_LIMIT | |
932 | ))) | |
933 | ||
934 | def test_purge_queue_upgrade(self): | |
935 | """ | |
936 | That when starting on a system with no purge queue in the metadata | |
937 | pool, we silently create one. | |
938 | :return: | |
939 | """ | |
940 | ||
941 | self.mds_cluster.mds_stop() | |
942 | self.mds_cluster.mds_fail() | |
943 | self.fs.rados(["rm", "500.00000000"]) | |
944 | self.mds_cluster.mds_restart() | |
945 | self.fs.wait_for_daemons() | |
946 | ||
224ce89b WB |
947 | def test_replicated_delete_speed(self): |
948 | """ | |
949 | That deletions of replicated metadata are not pathologically slow | |
950 | """ | |
951 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
952 | ||
953 | self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") | |
954 | self.mds_cluster.mds_fail_restart(rank_1_id) | |
955 | self.fs.wait_for_daemons() | |
956 | ||
957 | file_count = 10 | |
958 | ||
959 | self.mount_a.create_n_files("delete_me/file", file_count) | |
960 | ||
961 | self._force_migrate(rank_1_id, "delete_me", | |
962 | self.mount_a.path_to_ino("delete_me/file_0")) | |
963 | ||
964 | begin = datetime.datetime.now() | |
965 | self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) | |
966 | end = datetime.datetime.now() | |
967 | ||
968 | # What we're really checking here is that we are completing client | |
969 | # operations immediately rather than delaying until the next tick. | |
970 | tick_period = float(self.fs.get_config("mds_tick_interval", | |
971 | service_type="mds")) | |
972 | ||
973 | duration = (end - begin).total_seconds() | |
974 | self.assertLess(duration, (file_count * tick_period) * 0.25) | |
975 |