]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import time | |
3 | import logging | |
4 | from textwrap import dedent | |
5 | import datetime | |
6 | import gevent | |
7 | from teuthology.orchestra.run import CommandFailedError, Raw | |
8 | from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology | |
9 | ||
10 | log = logging.getLogger(__name__) | |
11 | ||
12 | ||
13 | class TestStrays(CephFSTestCase): | |
14 | MDSS_REQUIRED = 2 | |
15 | ||
16 | OPS_THROTTLE = 1 | |
17 | FILES_THROTTLE = 2 | |
18 | ||
19 | # Range of different file sizes used in throttle test's workload | |
20 | throttle_workload_size_range = 16 | |
21 | ||
22 | @for_teuthology | |
23 | def test_ops_throttle(self): | |
24 | self._test_throttling(self.OPS_THROTTLE) | |
25 | ||
26 | @for_teuthology | |
27 | def test_files_throttle(self): | |
28 | self._test_throttling(self.FILES_THROTTLE) | |
29 | ||
30 | def test_dir_deletion(self): | |
31 | """ | |
32 | That when deleting a bunch of dentries and the containing | |
33 | directory, everything gets purged. | |
34 | Catches cases where the client might e.g. fail to trim | |
35 | the unlinked dir from its cache. | |
36 | """ | |
37 | file_count = 1000 | |
38 | create_script = dedent(""" | |
39 | import os | |
40 | ||
41 | mount_path = "{mount_path}" | |
42 | subdir = "delete_me" | |
43 | size = {size} | |
44 | file_count = {file_count} | |
45 | os.mkdir(os.path.join(mount_path, subdir)) | |
46 | for i in xrange(0, file_count): | |
47 | filename = "{{0}}_{{1}}.bin".format(i, size) | |
48 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
49 | f.write(size * 'x') | |
50 | f.close() | |
51 | """.format( | |
52 | mount_path=self.mount_a.mountpoint, | |
53 | size=1024, | |
54 | file_count=file_count | |
55 | )) | |
56 | ||
57 | self.mount_a.run_python(create_script) | |
58 | ||
59 | # That the dirfrag object is created | |
60 | self.fs.mds_asok(["flush", "journal"]) | |
61 | dir_ino = self.mount_a.path_to_ino("delete_me") | |
62 | self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) | |
63 | ||
64 | # Remove everything | |
65 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
66 | self.fs.mds_asok(["flush", "journal"]) | |
67 | ||
68 | # That all the removed files get created as strays | |
69 | strays = self.get_mdc_stat("strays_created") | |
70 | self.assertEqual(strays, file_count + 1) | |
71 | ||
72 | # That the strays all get enqueued for purge | |
73 | self.wait_until_equal( | |
74 | lambda: self.get_mdc_stat("strays_enqueued"), | |
75 | strays, | |
76 | timeout=600 | |
77 | ||
78 | ) | |
79 | ||
80 | # That all the purge operations execute | |
81 | self.wait_until_equal( | |
82 | lambda: self.get_stat("purge_queue", "pq_executed"), | |
83 | strays, | |
84 | timeout=600 | |
85 | ) | |
86 | ||
87 | # That finally, the directory metadata object is gone | |
88 | self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) | |
89 | ||
90 | # That finally, the data objects are all gone | |
91 | self.await_data_pool_empty() | |
92 | ||
93 | def _test_throttling(self, throttle_type): | |
94 | self.data_log = [] | |
95 | try: | |
96 | return self._do_test_throttling(throttle_type) | |
97 | except: | |
98 | for l in self.data_log: | |
99 | log.info(",".join([l_.__str__() for l_ in l])) | |
100 | raise | |
101 | ||
102 | def _do_test_throttling(self, throttle_type): | |
103 | """ | |
104 | That the mds_max_purge_ops setting is respected | |
105 | """ | |
106 | ||
107 | def set_throttles(files, ops): | |
108 | """ | |
109 | Helper for updating ops/files limits, and calculating effective | |
110 | ops_per_pg setting to give the same ops limit. | |
111 | """ | |
112 | self.set_conf('mds', 'mds_max_purge_files', "%d" % files) | |
113 | self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) | |
114 | ||
115 | pgs = self.fs.mon_manager.get_pool_property( | |
116 | self.fs.get_data_pool_name(), | |
117 | "pg_num" | |
118 | ) | |
119 | ops_per_pg = float(ops) / pgs | |
120 | self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) | |
121 | ||
122 | # Test conditions depend on what we're going to be exercising. | |
123 | # * Lift the threshold on whatever throttle we are *not* testing, so | |
124 | # that the throttle of interest is the one that will be the bottleneck | |
125 | # * Create either many small files (test file count throttling) or fewer | |
126 | # large files (test op throttling) | |
127 | if throttle_type == self.OPS_THROTTLE: | |
128 | set_throttles(files=100000000, ops=16) | |
129 | size_unit = 1024 * 1024 # big files, generate lots of ops | |
130 | file_multiplier = 100 | |
131 | elif throttle_type == self.FILES_THROTTLE: | |
132 | # The default value of file limit is pretty permissive, so to avoid | |
133 | # the test running too fast, create lots of files and set the limit | |
134 | # pretty low. | |
135 | set_throttles(ops=100000000, files=6) | |
136 | size_unit = 1024 # small, numerous files | |
137 | file_multiplier = 200 | |
138 | else: | |
139 | raise NotImplemented(throttle_type) | |
140 | ||
141 | # Pick up config changes | |
142 | self.fs.mds_fail_restart() | |
143 | self.fs.wait_for_daemons() | |
144 | ||
145 | create_script = dedent(""" | |
146 | import os | |
147 | ||
148 | mount_path = "{mount_path}" | |
149 | subdir = "delete_me" | |
150 | size_unit = {size_unit} | |
151 | file_multiplier = {file_multiplier} | |
152 | os.mkdir(os.path.join(mount_path, subdir)) | |
153 | for i in xrange(0, file_multiplier): | |
154 | for size in xrange(0, {size_range}*size_unit, size_unit): | |
155 | filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) | |
156 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
157 | f.write(size * 'x') | |
158 | f.close() | |
159 | """.format( | |
160 | mount_path=self.mount_a.mountpoint, | |
161 | size_unit=size_unit, | |
162 | file_multiplier=file_multiplier, | |
163 | size_range=self.throttle_workload_size_range | |
164 | )) | |
165 | ||
166 | self.mount_a.run_python(create_script) | |
167 | ||
168 | # We will run the deletion in the background, to reduce the risk of it completing before | |
169 | # we have started monitoring the stray statistics. | |
170 | def background(): | |
171 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
172 | self.fs.mds_asok(["flush", "journal"]) | |
173 | ||
174 | background_thread = gevent.spawn(background) | |
175 | ||
176 | total_inodes = file_multiplier * self.throttle_workload_size_range + 1 | |
177 | mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) | |
178 | mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) | |
179 | ||
180 | # During this phase we look for the concurrent ops to exceed half | |
181 | # the limit (a heuristic) and not exceed the limit (a correctness | |
182 | # condition). | |
183 | purge_timeout = 600 | |
184 | elapsed = 0 | |
185 | files_high_water = 0 | |
186 | ops_high_water = 0 | |
187 | ||
188 | while True: | |
189 | stats = self.fs.mds_asok(['perf', 'dump']) | |
190 | mdc_stats = stats['mds_cache'] | |
191 | pq_stats = stats['purge_queue'] | |
192 | if elapsed >= purge_timeout: | |
193 | raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) | |
194 | ||
195 | num_strays = mdc_stats['num_strays'] | |
196 | num_strays_purging = pq_stats['pq_executing'] | |
197 | num_purge_ops = pq_stats['pq_executing_ops'] | |
198 | ||
199 | self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops]) | |
200 | ||
201 | files_high_water = max(files_high_water, num_strays_purging) | |
202 | ops_high_water = max(ops_high_water, num_purge_ops) | |
203 | ||
204 | total_strays_created = mdc_stats['strays_created'] | |
205 | total_strays_purged = pq_stats['pq_executed'] | |
206 | ||
207 | if total_strays_purged == total_inodes: | |
208 | log.info("Complete purge in {0} seconds".format(elapsed)) | |
209 | break | |
210 | elif total_strays_purged > total_inodes: | |
211 | raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) | |
212 | else: | |
213 | if throttle_type == self.OPS_THROTTLE: | |
214 | # 11 is filer_max_purge_ops plus one for the backtrace: | |
215 | # limit is allowed to be overshot by this much. | |
216 | if num_purge_ops > mds_max_purge_ops + 11: | |
217 | raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( | |
218 | num_purge_ops, mds_max_purge_ops | |
219 | )) | |
220 | elif throttle_type == self.FILES_THROTTLE: | |
221 | if num_strays_purging > mds_max_purge_files: | |
222 | raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( | |
223 | num_strays_purging, mds_max_purge_files | |
224 | )) | |
225 | else: | |
226 | raise NotImplemented(throttle_type) | |
227 | ||
228 | log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( | |
229 | num_strays_purging, num_strays, | |
230 | total_strays_purged, total_strays_created | |
231 | )) | |
232 | time.sleep(1) | |
233 | elapsed += 1 | |
234 | ||
235 | background_thread.join() | |
236 | ||
237 | # Check that we got up to a respectable rate during the purge. This is totally | |
238 | # racy, but should be safeish unless the cluster is pathologically slow, or | |
239 | # insanely fast such that the deletions all pass before we have polled the | |
240 | # statistics. | |
241 | if throttle_type == self.OPS_THROTTLE: | |
242 | if ops_high_water < mds_max_purge_ops / 2: | |
243 | raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( | |
244 | ops_high_water, mds_max_purge_ops | |
245 | )) | |
246 | elif throttle_type == self.FILES_THROTTLE: | |
247 | if files_high_water < mds_max_purge_files / 2: | |
248 | raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( | |
249 | ops_high_water, mds_max_purge_files | |
250 | )) | |
251 | ||
252 | # Sanity check all MDC stray stats | |
253 | stats = self.fs.mds_asok(['perf', 'dump']) | |
254 | mdc_stats = stats['mds_cache'] | |
255 | pq_stats = stats['purge_queue'] | |
256 | self.assertEqual(mdc_stats['num_strays'], 0) | |
257 | self.assertEqual(mdc_stats['num_strays_delayed'], 0) | |
258 | self.assertEqual(pq_stats['pq_executing'], 0) | |
259 | self.assertEqual(pq_stats['pq_executing_ops'], 0) | |
260 | self.assertEqual(mdc_stats['strays_created'], total_inodes) | |
261 | self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) | |
262 | self.assertEqual(pq_stats['pq_executed'], total_inodes) | |
263 | ||
264 | def get_mdc_stat(self, name, mds_id=None): | |
265 | return self.get_stat("mds_cache", name, mds_id) | |
266 | ||
267 | def get_stat(self, subsys, name, mds_id=None): | |
268 | return self.fs.mds_asok(['perf', 'dump', subsys, name], | |
269 | mds_id=mds_id)[subsys][name] | |
270 | ||
271 | def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, | |
272 | mds_id=None): | |
273 | self.wait_until_equal( | |
274 | lambda: self.get_stat(subsys, counter, mds_id), | |
275 | expect_val=expect_val, timeout=timeout, | |
276 | reject_fn=lambda x: x > expect_val | |
277 | ) | |
278 | ||
279 | def test_open_inode(self): | |
280 | """ | |
281 | That the case of a dentry unlinked while a client holds an | |
282 | inode open is handled correctly. | |
283 | ||
284 | The inode should be moved into a stray dentry, while the original | |
285 | dentry and directory should be purged. | |
286 | ||
287 | The inode's data should be purged when the client eventually closes | |
288 | it. | |
289 | """ | |
290 | mount_a_client_id = self.mount_a.get_global_id() | |
291 | ||
292 | # Write some bytes to a file | |
293 | size_mb = 8 | |
294 | ||
295 | # Hold the file open | |
296 | p = self.mount_a.open_background("open_file") | |
297 | self.mount_a.write_n_mb("open_file", size_mb) | |
298 | open_file_ino = self.mount_a.path_to_ino("open_file") | |
299 | ||
300 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
301 | ||
302 | # Unlink the dentry | |
303 | self.mount_a.run_shell(["rm", "-f", "open_file"]) | |
304 | ||
305 | # Wait to see the stray count increment | |
306 | self.wait_until_equal( | |
307 | lambda: self.get_mdc_stat("num_strays"), | |
308 | expect_val=1, timeout=60, reject_fn=lambda x: x > 1) | |
309 | ||
310 | # See that while the stray count has incremented, none have passed | |
311 | # on to the purge queue | |
312 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
313 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
314 | ||
315 | # See that the client still holds 2 caps | |
316 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
317 | ||
318 | # See that the data objects remain in the data pool | |
319 | self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) | |
320 | ||
321 | # Now close the file | |
322 | self.mount_a.kill_background(p) | |
323 | ||
324 | # Wait to see the client cap count decrement | |
325 | self.wait_until_equal( | |
326 | lambda: self.get_session(mount_a_client_id)['num_caps'], | |
327 | expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 | |
328 | ) | |
329 | # Wait to see the purge counter increment, stray count go to zero | |
330 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
331 | self.wait_until_equal( | |
332 | lambda: self.get_mdc_stat("num_strays"), | |
333 | expect_val=0, timeout=6, reject_fn=lambda x: x > 1 | |
334 | ) | |
335 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
336 | ||
337 | # See that the data objects no longer exist | |
338 | self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) | |
339 | ||
340 | self.await_data_pool_empty() | |
341 | ||
342 | def test_hardlink_reintegration(self): | |
343 | """ | |
344 | That removal of primary dentry of hardlinked inode results | |
345 | in reintegration of inode into the previously-remote dentry, | |
346 | rather than lingering as a stray indefinitely. | |
347 | """ | |
348 | # Write some bytes to file_a | |
349 | size_mb = 8 | |
31f18b77 FG |
350 | self.mount_a.run_shell(["mkdir", "dir_1"]) |
351 | self.mount_a.write_n_mb("dir_1/file_a", size_mb) | |
352 | ino = self.mount_a.path_to_ino("dir_1/file_a") | |
7c673cae FG |
353 | |
354 | # Create a hardlink named file_b | |
31f18b77 FG |
355 | self.mount_a.run_shell(["mkdir", "dir_2"]) |
356 | self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) | |
357 | self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) | |
7c673cae FG |
358 | |
359 | # Flush journal | |
360 | self.fs.mds_asok(['flush', 'journal']) | |
361 | ||
362 | # See that backtrace for the file points to the file_a path | |
363 | pre_unlink_bt = self.fs.read_backtrace(ino) | |
364 | self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") | |
365 | ||
31f18b77 FG |
366 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes |
367 | self.mount_a.umount_wait() | |
368 | self.fs.mds_asok(['flush', 'journal']) | |
369 | self.fs.mds_fail_restart() | |
370 | self.fs.wait_for_daemons() | |
371 | self.mount_a.mount() | |
372 | ||
7c673cae | 373 | # Unlink file_a |
31f18b77 | 374 | self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) |
7c673cae FG |
375 | |
376 | # See that a stray was created | |
377 | self.assertEqual(self.get_mdc_stat("num_strays"), 1) | |
378 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
379 | ||
380 | # Wait, see that data objects are still present (i.e. that the | |
381 | # stray did not advance to purging given time) | |
382 | time.sleep(30) | |
383 | self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) | |
384 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
385 | ||
386 | # See that before reintegration, the inode's backtrace points to a stray dir | |
387 | self.fs.mds_asok(['flush', 'journal']) | |
388 | self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) | |
389 | ||
31f18b77 FG |
390 | last_reintegrated = self.get_mdc_stat("strays_reintegrated") |
391 | ||
7c673cae FG |
392 | # Do a metadata operation on the remaining link (mv is heavy handed, but |
393 | # others like touch may be satisfied from caps without poking MDS) | |
31f18b77 FG |
394 | self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) |
395 | ||
396 | # Stray reintegration should happen as a result of the eval_remote call | |
397 | # on responding to a client request. | |
398 | self.wait_until_equal( | |
399 | lambda: self.get_mdc_stat("num_strays"), | |
400 | expect_val=0, | |
401 | timeout=60 | |
402 | ) | |
7c673cae FG |
403 | |
404 | # See the reintegration counter increment | |
31f18b77 FG |
405 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") |
406 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
407 | last_reintegrated = curr_reintegrated | |
7c673cae FG |
408 | |
409 | # Flush the journal | |
410 | self.fs.mds_asok(['flush', 'journal']) | |
411 | ||
412 | # See that the backtrace for the file points to the remaining link's path | |
413 | post_reint_bt = self.fs.read_backtrace(ino) | |
414 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") | |
415 | ||
31f18b77 FG |
416 | # mds should reintegrates stray when unlink finishes |
417 | self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) | |
418 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) | |
419 | ||
420 | # Stray reintegration should happen as a result of the notify_stray call | |
421 | # on completion of unlink | |
422 | self.wait_until_equal( | |
423 | lambda: self.get_mdc_stat("num_strays"), | |
424 | expect_val=0, | |
425 | timeout=60 | |
426 | ) | |
427 | ||
428 | # See the reintegration counter increment | |
429 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") | |
430 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
431 | last_reintegrated = curr_reintegrated | |
432 | ||
433 | # Flush the journal | |
434 | self.fs.mds_asok(['flush', 'journal']) | |
435 | ||
436 | # See that the backtrace for the file points to the newest link's path | |
437 | post_reint_bt = self.fs.read_backtrace(ino) | |
438 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") | |
7c673cae FG |
439 | |
440 | # Now really delete it | |
31f18b77 | 441 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) |
7c673cae FG |
442 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) |
443 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
444 | ||
445 | self.assert_purge_idle() | |
446 | self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) | |
447 | ||
31f18b77 FG |
448 | # We caused the inode to go stray 3 times |
449 | self.assertEqual(self.get_mdc_stat("strays_created"), 3) | |
450 | # We purged it at the last | |
7c673cae FG |
451 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) |
452 | ||
453 | def test_mv_hardlink_cleanup(self): | |
454 | """ | |
455 | That when doing a rename from A to B, and B has hardlinks, | |
456 | then we make a stray for B which is then reintegrated | |
457 | into one of his hardlinks. | |
458 | """ | |
459 | # Create file_a, file_b, and a hardlink to file_b | |
460 | size_mb = 8 | |
461 | self.mount_a.write_n_mb("file_a", size_mb) | |
462 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
463 | ||
464 | self.mount_a.write_n_mb("file_b", size_mb) | |
465 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
466 | ||
467 | self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) | |
468 | self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) | |
469 | ||
470 | # mv file_a file_b | |
471 | self.mount_a.run_shell(["mv", "file_a", "file_b"]) | |
472 | ||
31f18b77 FG |
473 | # Stray reintegration should happen as a result of the notify_stray call on |
474 | # completion of rename | |
475 | self.wait_until_equal( | |
476 | lambda: self.get_mdc_stat("num_strays"), | |
477 | expect_val=0, | |
478 | timeout=60 | |
479 | ) | |
7c673cae | 480 | |
7c673cae | 481 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) |
31f18b77 FG |
482 | self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) |
483 | ||
484 | # No data objects should have been deleted, as both files still have linkage. | |
7c673cae FG |
485 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) |
486 | self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) | |
487 | ||
7c673cae FG |
488 | self.fs.mds_asok(['flush', 'journal']) |
489 | ||
490 | post_reint_bt = self.fs.read_backtrace(file_b_ino) | |
31f18b77 | 491 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") |
7c673cae FG |
492 | |
493 | def _setup_two_ranks(self): | |
494 | # Set up two MDSs | |
495 | self.fs.set_allow_multimds(True) | |
496 | self.fs.set_max_mds(2) | |
497 | ||
498 | # See that we have two active MDSs | |
499 | self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, | |
500 | reject_fn=lambda v: v > 2 or v < 1) | |
501 | ||
502 | active_mds_names = self.fs.get_active_names() | |
503 | rank_0_id = active_mds_names[0] | |
504 | rank_1_id = active_mds_names[1] | |
505 | log.info("Ranks 0 and 1 are {0} and {1}".format( | |
506 | rank_0_id, rank_1_id)) | |
507 | ||
508 | # Get rid of other MDS daemons so that it's easier to know which | |
509 | # daemons to expect in which ranks after restarts | |
510 | for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: | |
511 | self.mds_cluster.mds_stop(unneeded_mds) | |
512 | self.mds_cluster.mds_fail(unneeded_mds) | |
513 | ||
514 | return rank_0_id, rank_1_id | |
515 | ||
31f18b77 | 516 | def _force_migrate(self, to_id, path, watch_ino): |
7c673cae | 517 | """ |
7c673cae FG |
518 | :param to_id: MDS id to move it to |
519 | :param path: Filesystem path (string) to move | |
520 | :param watch_ino: Inode number to look for at destination to confirm move | |
521 | :return: None | |
522 | """ | |
31f18b77 | 523 | self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path]) |
7c673cae FG |
524 | |
525 | # Poll the MDS cache dump to watch for the export completing | |
526 | migrated = False | |
527 | migrate_timeout = 60 | |
528 | migrate_elapsed = 0 | |
529 | while not migrated: | |
530 | data = self.fs.mds_asok(["dump", "cache"], to_id) | |
531 | for inode_data in data: | |
532 | if inode_data['ino'] == watch_ino: | |
533 | log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) | |
534 | if inode_data['is_auth'] is True: | |
535 | migrated = True | |
536 | break | |
537 | ||
538 | if not migrated: | |
539 | if migrate_elapsed > migrate_timeout: | |
540 | raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed)) | |
541 | else: | |
542 | migrate_elapsed += 1 | |
543 | time.sleep(1) | |
544 | ||
545 | def _is_stopped(self, rank): | |
546 | mds_map = self.fs.get_mds_map() | |
547 | return rank not in [i['rank'] for i in mds_map['info'].values()] | |
548 | ||
549 | def test_purge_on_shutdown(self): | |
550 | """ | |
551 | That when an MDS rank is shut down, its purge queue is | |
552 | drained in the process. | |
553 | """ | |
554 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
555 | ||
556 | self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") | |
557 | self.mds_cluster.mds_fail_restart(rank_1_id) | |
558 | self.fs.wait_for_daemons() | |
559 | ||
560 | file_count = 5 | |
561 | ||
562 | self.mount_a.create_n_files("delete_me/file", file_count) | |
563 | ||
31f18b77 | 564 | self._force_migrate(rank_1_id, "delete_me", |
7c673cae FG |
565 | self.mount_a.path_to_ino("delete_me/file_0")) |
566 | ||
567 | self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) | |
568 | self.mount_a.umount_wait() | |
569 | ||
570 | # See all the strays go into purge queue | |
571 | self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) | |
572 | self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) | |
573 | self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) | |
574 | ||
575 | # See nothing get purged from the purge queue (yet) | |
576 | time.sleep(10) | |
577 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
578 | ||
579 | # Shut down rank 1 | |
580 | self.fs.set_max_mds(1) | |
581 | self.fs.deactivate(1) | |
582 | ||
583 | # It shouldn't proceed past stopping because its still not allowed | |
584 | # to purge | |
585 | time.sleep(10) | |
586 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
587 | self.assertFalse(self._is_stopped(1)) | |
588 | ||
589 | # Permit the daemon to start purging again | |
590 | self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), | |
591 | 'injectargs', | |
592 | "--mds_max_purge_files 100") | |
593 | ||
594 | # It should now proceed through shutdown | |
595 | self.wait_until_true( | |
596 | lambda: self._is_stopped(1), | |
597 | timeout=60 | |
598 | ) | |
599 | ||
600 | # ...and in the process purge all that data | |
601 | self.await_data_pool_empty() | |
602 | ||
603 | def test_migration_on_shutdown(self): | |
604 | """ | |
605 | That when an MDS rank is shut down, any non-purgeable strays | |
606 | get migrated to another rank. | |
607 | """ | |
608 | ||
609 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
610 | ||
611 | # Create a non-purgeable stray in a ~mds1 stray directory | |
612 | # by doing a hard link and deleting the original file | |
31f18b77 FG |
613 | self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"]) |
614 | self.mount_a.run_shell(["touch", "dir_1/original"]) | |
615 | self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"]) | |
616 | ||
617 | self._force_migrate(rank_1_id, "dir_1", | |
618 | self.mount_a.path_to_ino("dir_1/original")) | |
619 | ||
620 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes | |
621 | self.mount_a.umount_wait() | |
622 | self.fs.mds_asok(['flush', 'journal'], rank_0_id) | |
623 | self.fs.mds_asok(['flush', 'journal'], rank_1_id) | |
624 | self.fs.mds_fail_restart() | |
625 | self.fs.wait_for_daemons() | |
7c673cae | 626 | |
31f18b77 FG |
627 | active_mds_names = self.fs.get_active_names() |
628 | rank_0_id = active_mds_names[0] | |
629 | rank_1_id = active_mds_names[1] | |
630 | ||
631 | self.mount_a.mount() | |
7c673cae | 632 | |
31f18b77 | 633 | self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) |
7c673cae FG |
634 | self.mount_a.umount_wait() |
635 | ||
636 | self._wait_for_counter("mds_cache", "strays_created", 1, | |
637 | mds_id=rank_1_id) | |
638 | ||
639 | # Shut down rank 1 | |
640 | self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") | |
641 | self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") | |
642 | ||
643 | # Wait til we get to a single active MDS mdsmap state | |
644 | self.wait_until_true(lambda: self._is_stopped(1), timeout=120) | |
645 | ||
646 | # See that the stray counter on rank 0 has incremented | |
647 | self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) | |
648 | ||
649 | def assert_backtrace(self, ino, expected_path): | |
650 | """ | |
651 | Assert that the backtrace in the data pool for an inode matches | |
652 | an expected /foo/bar path. | |
653 | """ | |
654 | expected_elements = expected_path.strip("/").split("/") | |
655 | bt = self.fs.read_backtrace(ino) | |
656 | actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) | |
657 | self.assertListEqual(expected_elements, actual_elements) | |
658 | ||
659 | def get_backtrace_path(self, ino): | |
660 | bt = self.fs.read_backtrace(ino) | |
661 | elements = reversed([dn['dname'] for dn in bt['ancestors']]) | |
662 | return "/".join(elements) | |
663 | ||
664 | def assert_purge_idle(self): | |
665 | """ | |
666 | Assert that the MDS perf counters indicate no strays exist and | |
667 | no ongoing purge activity. Sanity check for when PurgeQueue should | |
668 | be idle. | |
669 | """ | |
670 | mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] | |
671 | pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] | |
672 | self.assertEqual(mdc_stats["num_strays"], 0) | |
673 | self.assertEqual(mdc_stats["num_strays_delayed"], 0) | |
674 | self.assertEqual(pq_stats["pq_executing"], 0) | |
675 | self.assertEqual(pq_stats["pq_executing_ops"], 0) | |
676 | ||
677 | def test_mv_cleanup(self): | |
678 | """ | |
679 | That when doing a rename from A to B, and B has no hardlinks, | |
680 | then we make a stray for B and purge him. | |
681 | """ | |
682 | # Create file_a and file_b, write some to both | |
683 | size_mb = 8 | |
684 | self.mount_a.write_n_mb("file_a", size_mb) | |
685 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
686 | self.mount_a.write_n_mb("file_b", size_mb) | |
687 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
688 | ||
689 | self.fs.mds_asok(['flush', 'journal']) | |
690 | self.assert_backtrace(file_a_ino, "file_a") | |
691 | self.assert_backtrace(file_b_ino, "file_b") | |
692 | ||
693 | # mv file_a file_b | |
694 | self.mount_a.run_shell(['mv', 'file_a', 'file_b']) | |
695 | ||
696 | # See that stray counter increments | |
697 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
698 | # Wait for purge counter to increment | |
699 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
700 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
701 | ||
702 | self.assert_purge_idle() | |
703 | ||
704 | # file_b should have been purged | |
705 | self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) | |
706 | ||
707 | # Backtrace should have updated from file_a to file_b | |
708 | self.fs.mds_asok(['flush', 'journal']) | |
709 | self.assert_backtrace(file_a_ino, "file_b") | |
710 | ||
711 | # file_a's data should still exist | |
712 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) | |
713 | ||
714 | def _pool_df(self, pool_name): | |
715 | """ | |
716 | Return a dict like | |
717 | { | |
718 | "kb_used": 0, | |
719 | "bytes_used": 0, | |
720 | "max_avail": 19630292406, | |
721 | "objects": 0 | |
722 | } | |
723 | ||
724 | :param pool_name: Which pool (must exist) | |
725 | """ | |
726 | out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") | |
727 | for p in json.loads(out)['pools']: | |
728 | if p['name'] == pool_name: | |
729 | return p['stats'] | |
730 | ||
731 | raise RuntimeError("Pool '{0}' not found".format(pool_name)) | |
732 | ||
733 | def await_data_pool_empty(self): | |
734 | self.wait_until_true( | |
735 | lambda: self._pool_df( | |
736 | self.fs.get_data_pool_name() | |
737 | )['objects'] == 0, | |
738 | timeout=60) | |
739 | ||
740 | def test_snapshot_remove(self): | |
741 | """ | |
742 | That removal of a snapshot that references a now-unlinked file results | |
743 | in purging on the stray for the file. | |
744 | """ | |
745 | # Enable snapshots | |
746 | self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true", | |
747 | "--yes-i-really-mean-it") | |
748 | ||
749 | # Create a dir with a file in it | |
750 | size_mb = 8 | |
751 | self.mount_a.run_shell(["mkdir", "snapdir"]) | |
752 | self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) | |
753 | self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) | |
754 | file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") | |
755 | ||
756 | # Snapshot the dir | |
757 | self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) | |
758 | ||
759 | # Cause the head revision to deviate from the snapshot | |
760 | self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) | |
761 | ||
762 | # Flush the journal so that backtraces, dirfrag objects will actually be written | |
763 | self.fs.mds_asok(["flush", "journal"]) | |
764 | ||
765 | # Unlink the file | |
766 | self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) | |
767 | self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) | |
768 | ||
769 | # Unmount the client because when I come back to check the data is still | |
770 | # in the file I don't want to just see what's in the page cache. | |
771 | self.mount_a.umount_wait() | |
772 | ||
773 | self.assertEqual(self.get_mdc_stat("strays_created"), 2) | |
774 | ||
775 | # FIXME: at this stage we see a purge and the stray count drops to | |
776 | # zero, but there's actually still a stray, so at the very | |
777 | # least the StrayManager stats code is slightly off | |
778 | ||
779 | self.mount_a.mount() | |
780 | ||
781 | # See that the data from the snapshotted revision of the file is still present | |
782 | # and correct | |
783 | self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) | |
784 | ||
785 | # Remove the snapshot | |
786 | self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) | |
787 | self.mount_a.umount_wait() | |
788 | ||
789 | # Purging file_a doesn't happen until after we've flushed the journal, because | |
790 | # it is referenced by the snapshotted subdir, and the snapshot isn't really | |
791 | # gone until the journal references to it are gone | |
792 | self.fs.mds_asok(["flush", "journal"]) | |
793 | ||
794 | # See that a purge happens now | |
795 | self._wait_for_counter("mds_cache", "strays_enqueued", 2) | |
796 | self._wait_for_counter("purge_queue", "pq_executed", 2) | |
797 | ||
798 | self.assertTrue(self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024)) | |
799 | self.await_data_pool_empty() | |
800 | ||
801 | def test_fancy_layout(self): | |
802 | """ | |
803 | purge stray file with fancy layout | |
804 | """ | |
805 | ||
806 | file_name = "fancy_layout_file" | |
807 | self.mount_a.run_shell(["touch", file_name]) | |
808 | ||
809 | file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" | |
810 | self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) | |
811 | ||
812 | # 35MB requires 7 objects | |
813 | size_mb = 35 | |
814 | self.mount_a.write_n_mb(file_name, size_mb) | |
815 | ||
816 | self.mount_a.run_shell(["rm", "-f", file_name]) | |
817 | self.fs.mds_asok(["flush", "journal"]) | |
818 | ||
819 | # can't use self.fs.data_objects_absent here, it does not support fancy layout | |
820 | self.await_data_pool_empty() | |
821 | ||
822 | def test_dirfrag_limit(self): | |
823 | """ | |
824 | That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). | |
825 | ||
826 | That fragmentation (forced) will allow more entries to be created. | |
827 | ||
828 | That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. | |
829 | """ | |
830 | ||
831 | self.fs.set_allow_dirfrags(True) | |
832 | ||
833 | LOW_LIMIT = 50 | |
834 | for mds in self.fs.get_daemon_names(): | |
835 | self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds) | |
836 | ||
837 | try: | |
838 | self.mount_a.run_python(dedent(""" | |
839 | import os | |
840 | path = os.path.join("{path}", "subdir") | |
841 | os.mkdir(path) | |
842 | for n in range(0, {file_count}): | |
843 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
844 | """.format( | |
845 | path=self.mount_a.mountpoint, | |
846 | file_count=LOW_LIMIT+1 | |
847 | ))) | |
848 | except CommandFailedError: | |
849 | pass # ENOSPAC | |
850 | else: | |
851 | raise RuntimeError("fragment size exceeded") | |
852 | ||
853 | # Now test that we can go beyond the limit if we fragment the directory | |
854 | ||
855 | self.mount_a.run_python(dedent(""" | |
856 | import os | |
857 | path = os.path.join("{path}", "subdir2") | |
858 | os.mkdir(path) | |
859 | for n in range(0, {file_count}): | |
860 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
861 | dfd = os.open(path, os.O_DIRECTORY) | |
862 | os.fsync(dfd) | |
863 | """.format( | |
864 | path=self.mount_a.mountpoint, | |
865 | file_count=LOW_LIMIT | |
866 | ))) | |
867 | ||
868 | # Ensure that subdir2 is fragmented | |
869 | mds_id = self.fs.get_active_names()[0] | |
870 | self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id) | |
871 | ||
872 | # remount+flush (release client caps) | |
873 | self.mount_a.umount_wait() | |
874 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
875 | self.mount_a.mount() | |
876 | self.mount_a.wait_until_mounted() | |
877 | ||
878 | # Create 50% more files than the current fragment limit | |
879 | self.mount_a.run_python(dedent(""" | |
880 | import os | |
881 | path = os.path.join("{path}", "subdir2") | |
882 | for n in range({file_count}, ({file_count}*3)//2): | |
883 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
884 | """.format( | |
885 | path=self.mount_a.mountpoint, | |
886 | file_count=LOW_LIMIT | |
887 | ))) | |
888 | ||
889 | # Now test the stray directory size is limited and recovers | |
890 | strays_before = self.get_mdc_stat("strays_created") | |
891 | try: | |
892 | self.mount_a.run_python(dedent(""" | |
893 | import os | |
894 | path = os.path.join("{path}", "subdir3") | |
895 | os.mkdir(path) | |
896 | for n in range({file_count}): | |
897 | fpath = os.path.join(path, "%s" % n) | |
898 | f = open(fpath, 'w') | |
899 | f.write("%s" % n) | |
900 | f.close() | |
901 | os.unlink(fpath) | |
902 | """.format( | |
903 | path=self.mount_a.mountpoint, | |
904 | file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count | |
905 | ))) | |
906 | except CommandFailedError: | |
907 | pass # ENOSPAC | |
908 | else: | |
909 | raise RuntimeError("fragment size exceeded") | |
910 | ||
911 | strays_after = self.get_mdc_stat("strays_created") | |
912 | self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) | |
913 | ||
914 | self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) | |
915 | self._wait_for_counter("purge_queue", "pq_executed", strays_after) | |
916 | ||
917 | self.mount_a.run_python(dedent(""" | |
918 | import os | |
919 | path = os.path.join("{path}", "subdir4") | |
920 | os.mkdir(path) | |
921 | for n in range({file_count}): | |
922 | fpath = os.path.join(path, "%s" % n) | |
923 | f = open(fpath, 'w') | |
924 | f.write("%s" % n) | |
925 | f.close() | |
926 | os.unlink(fpath) | |
927 | """.format( | |
928 | path=self.mount_a.mountpoint, | |
929 | file_count=LOW_LIMIT | |
930 | ))) | |
931 | ||
932 | def test_purge_queue_upgrade(self): | |
933 | """ | |
934 | That when starting on a system with no purge queue in the metadata | |
935 | pool, we silently create one. | |
936 | :return: | |
937 | """ | |
938 | ||
939 | self.mds_cluster.mds_stop() | |
940 | self.mds_cluster.mds_fail() | |
941 | self.fs.rados(["rm", "500.00000000"]) | |
942 | self.mds_cluster.mds_restart() | |
943 | self.fs.wait_for_daemons() | |
944 | ||
945 | def test_purge_queue_op_rate(self): | |
946 | """ | |
947 | A busy purge queue is meant to aggregate operations sufficiently | |
948 | that our RADOS ops to the metadata pool are not O(files). Check | |
949 | that that is so. | |
950 | :return: | |
951 | """ | |
952 | ||
953 | # For low rates of deletion, the rate of metadata ops actually | |
954 | # will be o(files), so to see the desired behaviour we have to give | |
955 | # the system a significant quantity, i.e. an order of magnitude | |
956 | # more than the number of files it will purge at one time. | |
957 | ||
958 | max_purge_files = 2 | |
959 | ||
31f18b77 | 960 | self.set_conf('mds', 'mds_bal_frag', 'false') |
7c673cae FG |
961 | self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files) |
962 | self.fs.mds_fail_restart() | |
963 | self.fs.wait_for_daemons() | |
964 | ||
965 | phase_1_files = 256 | |
966 | phase_2_files = 512 | |
967 | ||
968 | self.mount_a.run_shell(["mkdir", "phase1"]) | |
969 | self.mount_a.create_n_files("phase1/file", phase_1_files) | |
970 | ||
971 | self.mount_a.run_shell(["mkdir", "phase2"]) | |
972 | self.mount_a.create_n_files("phase2/file", phase_2_files) | |
973 | ||
974 | def unlink_and_count_ops(path, expected_deletions): | |
975 | initial_ops = self.get_stat("objecter", "op") | |
976 | initial_pq_executed = self.get_stat("purge_queue", "pq_executed") | |
977 | ||
978 | self.mount_a.run_shell(["rm", "-rf", path]) | |
979 | ||
980 | self._wait_for_counter( | |
981 | "purge_queue", "pq_executed", initial_pq_executed + expected_deletions | |
982 | ) | |
983 | ||
984 | final_ops = self.get_stat("objecter", "op") | |
985 | ||
986 | # Calculation of the *overhead* operations, i.e. do not include | |
987 | # the operations where we actually delete files. | |
988 | return final_ops - initial_ops - expected_deletions | |
989 | ||
990 | self.fs.mds_asok(['flush', 'journal']) | |
991 | phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1) | |
992 | ||
993 | self.fs.mds_asok(['flush', 'journal']) | |
994 | phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1) | |
995 | ||
996 | log.info("Phase 1: {0}".format(phase1_ops)) | |
997 | log.info("Phase 2: {0}".format(phase2_ops)) | |
998 | ||
999 | # The success criterion is that deleting double the number | |
1000 | # of files doesn't generate double the number of overhead ops | |
1001 | # -- this comparison is a rough approximation of that rule. | |
1002 | self.assertTrue(phase2_ops < phase1_ops * 1.25) | |
1003 | ||
1004 | # Finally, check that our activity did include properly quiescing | |
1005 | # the queue (i.e. call to Journaler::write_head in the right place), | |
1006 | # by restarting the MDS and checking that it doesn't try re-executing | |
1007 | # any of the work we did. | |
1008 | self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays | |
1009 | # hanging around | |
1010 | self.fs.mds_fail_restart() | |
1011 | self.fs.wait_for_daemons() | |
1012 | time.sleep(10) | |
1013 | self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0) |