]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | """ | |
3 | Teuthology task for exercising CephFS client recovery | |
4 | """ | |
5 | ||
6 | import logging | |
7 | from textwrap import dedent | |
8 | import time | |
9 | import distutils.version as version | |
10 | import re | |
11 | import os | |
12 | ||
13 | from teuthology.orchestra.run import CommandFailedError, ConnectionLostError | |
14 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
15 | from teuthology.packaging import get_package_version | |
16 | ||
17 | ||
18 | log = logging.getLogger(__name__) | |
19 | ||
20 | ||
21 | # Arbitrary timeouts for operations involving restarting | |
22 | # an MDS or waiting for it to come up | |
23 | MDS_RESTART_GRACE = 60 | |
24 | ||
25 | ||
26 | class TestClientNetworkRecovery(CephFSTestCase): | |
27 | REQUIRE_KCLIENT_REMOTE = True | |
28 | REQUIRE_ONE_CLIENT_REMOTE = True | |
29 | CLIENTS_REQUIRED = 2 | |
30 | ||
31 | LOAD_SETTINGS = ["mds_session_timeout", "mds_reconnect_timeout", "ms_max_backoff"] | |
32 | ||
33 | # Environment references | |
34 | mds_session_timeout = None | |
35 | mds_reconnect_timeout = None | |
36 | ms_max_backoff = None | |
37 | ||
38 | def test_network_death(self): | |
39 | """ | |
40 | Simulate software freeze or temporary network failure. | |
41 | ||
42 | Check that the client blocks I/O during failure, and completes | |
43 | I/O after failure. | |
44 | """ | |
45 | ||
46 | # We only need one client | |
47 | self.mount_b.umount_wait() | |
48 | ||
49 | # Initially our one client session should be visible | |
50 | client_id = self.mount_a.get_global_id() | |
51 | ls_data = self._session_list() | |
52 | self.assert_session_count(1, ls_data) | |
53 | self.assertEqual(ls_data[0]['id'], client_id) | |
54 | self.assert_session_state(client_id, "open") | |
55 | ||
56 | # ...and capable of doing I/O without blocking | |
57 | self.mount_a.create_files() | |
58 | ||
59 | # ...but if we turn off the network | |
60 | self.fs.set_clients_block(True) | |
61 | ||
62 | # ...and try and start an I/O | |
63 | write_blocked = self.mount_a.write_background() | |
64 | ||
65 | # ...then it should block | |
66 | self.assertFalse(write_blocked.finished) | |
67 | self.assert_session_state(client_id, "open") | |
68 | time.sleep(self.mds_session_timeout * 1.5) # Long enough for MDS to consider session stale | |
69 | self.assertFalse(write_blocked.finished) | |
70 | self.assert_session_state(client_id, "stale") | |
71 | ||
72 | # ...until we re-enable I/O | |
73 | self.fs.set_clients_block(False) | |
74 | ||
75 | # ...when it should complete promptly | |
76 | a = time.time() | |
77 | self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2) | |
78 | write_blocked.wait() # Already know we're finished, wait() to raise exception on errors | |
79 | recovery_time = time.time() - a | |
80 | log.info("recovery time: {0}".format(recovery_time)) | |
81 | self.assert_session_state(client_id, "open") | |
82 | ||
83 | ||
84 | class TestClientRecovery(CephFSTestCase): | |
85 | REQUIRE_KCLIENT_REMOTE = True | |
86 | CLIENTS_REQUIRED = 2 | |
87 | ||
88 | LOAD_SETTINGS = ["mds_session_timeout", "mds_reconnect_timeout", "ms_max_backoff"] | |
89 | ||
90 | # Environment references | |
91 | mds_session_timeout = None | |
92 | mds_reconnect_timeout = None | |
93 | ms_max_backoff = None | |
94 | ||
95 | def test_basic(self): | |
96 | # Check that two clients come up healthy and see each others' files | |
97 | # ===================================================== | |
98 | self.mount_a.create_files() | |
99 | self.mount_a.check_files() | |
100 | self.mount_a.umount_wait() | |
101 | ||
102 | self.mount_b.check_files() | |
103 | ||
104 | self.mount_a.mount() | |
105 | self.mount_a.wait_until_mounted() | |
106 | ||
107 | # Check that the admin socket interface is correctly reporting | |
108 | # two sessions | |
109 | # ===================================================== | |
110 | ls_data = self._session_list() | |
111 | self.assert_session_count(2, ls_data) | |
112 | ||
113 | self.assertSetEqual( | |
114 | set([l['id'] for l in ls_data]), | |
115 | {self.mount_a.get_global_id(), self.mount_b.get_global_id()} | |
116 | ) | |
117 | ||
118 | def test_restart(self): | |
119 | # Check that after an MDS restart both clients reconnect and continue | |
120 | # to handle I/O | |
121 | # ===================================================== | |
122 | self.fs.mds_fail_restart() | |
123 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
124 | ||
125 | self.mount_a.create_destroy() | |
126 | self.mount_b.create_destroy() | |
127 | ||
128 | def _session_num_caps(self, client_id): | |
129 | ls_data = self.fs.mds_asok(['session', 'ls']) | |
130 | return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps']) | |
131 | ||
132 | def test_reconnect_timeout(self): | |
133 | # Reconnect timeout | |
134 | # ================= | |
135 | # Check that if I stop an MDS and a client goes away, the MDS waits | |
136 | # for the reconnect period | |
137 | self.fs.mds_stop() | |
138 | self.fs.mds_fail() | |
139 | ||
140 | mount_a_client_id = self.mount_a.get_global_id() | |
141 | self.mount_a.umount_wait(force=True) | |
142 | ||
143 | self.fs.mds_restart() | |
144 | ||
145 | self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) | |
146 | # Check that the MDS locally reports its state correctly | |
147 | status = self.fs.mds_asok(['status']) | |
148 | self.assertIn("reconnect_status", status) | |
149 | ||
150 | ls_data = self._session_list() | |
151 | self.assert_session_count(2, ls_data) | |
152 | ||
153 | # The session for the dead client should have the 'reconnect' flag set | |
154 | self.assertTrue(self.get_session(mount_a_client_id)['reconnecting']) | |
155 | ||
156 | # Wait for the reconnect state to clear, this should take the | |
157 | # reconnect timeout period. | |
158 | in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2) | |
159 | # Check that the period we waited to enter active is within a factor | |
160 | # of two of the reconnect timeout. | |
161 | self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2, | |
162 | "Should have been in reconnect phase for {0} but only took {1}".format( | |
163 | self.mds_reconnect_timeout, in_reconnect_for | |
164 | )) | |
165 | ||
166 | self.assert_session_count(1) | |
167 | ||
168 | # Check that the client that timed out during reconnect can | |
169 | # mount again and do I/O | |
170 | self.mount_a.mount() | |
171 | self.mount_a.wait_until_mounted() | |
172 | self.mount_a.create_destroy() | |
173 | ||
174 | self.assert_session_count(2) | |
175 | ||
176 | def test_reconnect_eviction(self): | |
177 | # Eviction during reconnect | |
178 | # ========================= | |
179 | mount_a_client_id = self.mount_a.get_global_id() | |
180 | ||
181 | self.fs.mds_stop() | |
182 | self.fs.mds_fail() | |
183 | ||
184 | # The mount goes away while the MDS is offline | |
185 | self.mount_a.kill() | |
186 | ||
187 | self.fs.mds_restart() | |
188 | ||
189 | # Enter reconnect phase | |
190 | self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) | |
191 | self.assert_session_count(2) | |
192 | ||
193 | # Evict the stuck client | |
194 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
195 | self.assert_session_count(1) | |
196 | ||
197 | # Observe that we proceed to active phase without waiting full reconnect timeout | |
198 | evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
199 | # Once we evict the troublemaker, the reconnect phase should complete | |
200 | # in well under the reconnect timeout. | |
201 | self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5, | |
202 | "reconnect did not complete soon enough after eviction, took {0}".format( | |
203 | evict_til_active | |
204 | )) | |
205 | ||
206 | # We killed earlier so must clean up before trying to use again | |
207 | self.mount_a.kill_cleanup() | |
208 | ||
209 | # Bring the client back | |
210 | self.mount_a.mount() | |
211 | self.mount_a.wait_until_mounted() | |
212 | self.mount_a.create_destroy() | |
213 | ||
214 | def test_stale_caps(self): | |
215 | # Capability release from stale session | |
216 | # ===================================== | |
217 | cap_holder = self.mount_a.open_background() | |
218 | ||
219 | # Wait for the file to be visible from another client, indicating | |
220 | # that mount_a has completed its network ops | |
221 | self.mount_b.wait_for_visible() | |
222 | ||
223 | # Simulate client death | |
224 | self.mount_a.kill() | |
225 | ||
226 | try: | |
227 | # Now, after mds_session_timeout seconds, the waiter should | |
228 | # complete their operation when the MDS marks the holder's | |
229 | # session stale. | |
230 | cap_waiter = self.mount_b.write_background() | |
231 | a = time.time() | |
232 | cap_waiter.wait() | |
233 | b = time.time() | |
234 | ||
235 | # Should have succeeded | |
236 | self.assertEqual(cap_waiter.exitstatus, 0) | |
237 | ||
238 | cap_waited = b - a | |
239 | log.info("cap_waiter waited {0}s".format(cap_waited)) | |
240 | self.assertTrue(self.mds_session_timeout / 2.0 <= cap_waited <= self.mds_session_timeout * 2.0, | |
241 | "Capability handover took {0}, expected approx {1}".format( | |
242 | cap_waited, self.mds_session_timeout | |
243 | )) | |
244 | ||
245 | cap_holder.stdin.close() | |
246 | try: | |
247 | cap_holder.wait() | |
248 | except (CommandFailedError, ConnectionLostError): | |
249 | # We killed it (and possibly its node), so it raises an error | |
250 | pass | |
251 | finally: | |
252 | # teardown() doesn't quite handle this case cleanly, so help it out | |
253 | self.mount_a.kill_cleanup() | |
254 | ||
255 | self.mount_a.mount() | |
256 | self.mount_a.wait_until_mounted() | |
257 | ||
258 | def test_evicted_caps(self): | |
259 | # Eviction while holding a capability | |
260 | # =================================== | |
261 | ||
262 | # Take out a write capability on a file on client A, | |
263 | # and then immediately kill it. | |
264 | cap_holder = self.mount_a.open_background() | |
265 | mount_a_client_id = self.mount_a.get_global_id() | |
266 | ||
267 | # Wait for the file to be visible from another client, indicating | |
268 | # that mount_a has completed its network ops | |
269 | self.mount_b.wait_for_visible() | |
270 | ||
271 | # Simulate client death | |
272 | self.mount_a.kill() | |
273 | ||
274 | try: | |
275 | # The waiter should get stuck waiting for the capability | |
276 | # held on the MDS by the now-dead client A | |
277 | cap_waiter = self.mount_b.write_background() | |
278 | time.sleep(5) | |
279 | self.assertFalse(cap_waiter.finished) | |
280 | ||
281 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
282 | # Now, because I evicted the old holder of the capability, it should | |
283 | # immediately get handed over to the waiter | |
284 | a = time.time() | |
285 | cap_waiter.wait() | |
286 | b = time.time() | |
287 | cap_waited = b - a | |
288 | log.info("cap_waiter waited {0}s".format(cap_waited)) | |
289 | # This is the check that it happened 'now' rather than waiting | |
290 | # for the session timeout | |
291 | self.assertLess(cap_waited, self.mds_session_timeout / 2.0, | |
292 | "Capability handover took {0}, expected less than {1}".format( | |
293 | cap_waited, self.mds_session_timeout / 2.0 | |
294 | )) | |
295 | ||
296 | cap_holder.stdin.close() | |
297 | try: | |
298 | cap_holder.wait() | |
299 | except (CommandFailedError, ConnectionLostError): | |
300 | # We killed it (and possibly its node), so it raises an error | |
301 | pass | |
302 | finally: | |
303 | self.mount_a.kill_cleanup() | |
304 | ||
305 | self.mount_a.mount() | |
306 | self.mount_a.wait_until_mounted() | |
307 | ||
308 | def test_trim_caps(self): | |
309 | # Trim capability when reconnecting MDS | |
310 | # =================================== | |
311 | ||
312 | count = 500 | |
313 | # Create lots of files | |
314 | for i in range(count): | |
315 | self.mount_a.run_shell(["touch", "f{0}".format(i)]) | |
316 | ||
317 | # Populate mount_b's cache | |
31f18b77 | 318 | self.mount_b.run_shell(["ls", "-l"]) |
7c673cae FG |
319 | |
320 | client_id = self.mount_b.get_global_id() | |
321 | num_caps = self._session_num_caps(client_id) | |
322 | self.assertGreaterEqual(num_caps, count) | |
323 | ||
324 | # Restart MDS. client should trim its cache when reconnecting to the MDS | |
325 | self.fs.mds_fail_restart() | |
326 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
327 | ||
328 | num_caps = self._session_num_caps(client_id) | |
329 | self.assertLess(num_caps, count, | |
330 | "should have less than {0} capabilities, have {1}".format( | |
331 | count, num_caps | |
332 | )) | |
333 | ||
31f18b77 | 334 | def _is_flockable(self): |
7c673cae FG |
335 | a_version_str = get_package_version(self.mount_a.client_remote, "fuse") |
336 | b_version_str = get_package_version(self.mount_b.client_remote, "fuse") | |
337 | flock_version_str = "2.9" | |
338 | ||
339 | version_regex = re.compile(r"[0-9\.]+") | |
340 | a_result = version_regex.match(a_version_str) | |
341 | self.assertTrue(a_result) | |
342 | b_result = version_regex.match(b_version_str) | |
343 | self.assertTrue(b_result) | |
344 | a_version = version.StrictVersion(a_result.group()) | |
345 | b_version = version.StrictVersion(b_result.group()) | |
346 | flock_version=version.StrictVersion(flock_version_str) | |
347 | ||
7c673cae | 348 | if (a_version >= flock_version and b_version >= flock_version): |
31f18b77 FG |
349 | log.info("flock locks are available") |
350 | return True | |
7c673cae FG |
351 | else: |
352 | log.info("not testing flock locks, machines have versions {av} and {bv}".format( | |
353 | av=a_version_str,bv=b_version_str)) | |
31f18b77 FG |
354 | return False |
355 | ||
356 | def test_filelock(self): | |
357 | """ | |
358 | Check that file lock doesn't get lost after an MDS restart | |
359 | """ | |
7c673cae | 360 | |
31f18b77 | 361 | flockable = self._is_flockable() |
7c673cae FG |
362 | lock_holder = self.mount_a.lock_background(do_flock=flockable) |
363 | ||
364 | self.mount_b.wait_for_visible("background_file-2") | |
365 | self.mount_b.check_filelock(do_flock=flockable) | |
366 | ||
367 | self.fs.mds_fail_restart() | |
368 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
369 | ||
370 | self.mount_b.check_filelock(do_flock=flockable) | |
371 | ||
372 | # Tear down the background process | |
373 | lock_holder.stdin.close() | |
374 | try: | |
375 | lock_holder.wait() | |
376 | except (CommandFailedError, ConnectionLostError): | |
377 | # We killed it, so it raises an error | |
378 | pass | |
379 | ||
31f18b77 FG |
380 | def test_filelock_eviction(self): |
381 | """ | |
382 | Check that file lock held by evicted client is given to | |
383 | waiting client. | |
384 | """ | |
385 | if not self._is_flockable(): | |
386 | self.skipTest("flock is not available") | |
387 | ||
388 | lock_holder = self.mount_a.lock_background() | |
389 | self.mount_b.wait_for_visible("background_file-2") | |
390 | self.mount_b.check_filelock() | |
391 | ||
392 | lock_taker = self.mount_b.lock_and_release() | |
393 | # Check the taker is waiting (doesn't get it immediately) | |
394 | time.sleep(2) | |
395 | self.assertFalse(lock_holder.finished) | |
396 | self.assertFalse(lock_taker.finished) | |
397 | ||
398 | mount_a_client_id = self.mount_a.get_global_id() | |
399 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
400 | ||
401 | # Evicting mount_a should let mount_b's attepmt to take the lock | |
402 | # suceed | |
403 | self.wait_until_true( | |
404 | lambda: lock_taker.finished, | |
405 | timeout=10) | |
406 | ||
7c673cae FG |
407 | def test_dir_fsync(self): |
408 | self._test_fsync(True); | |
409 | ||
410 | def test_create_fsync(self): | |
411 | self._test_fsync(False); | |
412 | ||
413 | def _test_fsync(self, dirfsync): | |
414 | """ | |
415 | That calls to fsync guarantee visibility of metadata to another | |
416 | client immediately after the fsyncing client dies. | |
417 | """ | |
418 | ||
419 | # Leave this guy out until he's needed | |
420 | self.mount_b.umount_wait() | |
421 | ||
422 | # Create dir + child dentry on client A, and fsync the dir | |
423 | path = os.path.join(self.mount_a.mountpoint, "subdir") | |
424 | self.mount_a.run_python( | |
425 | dedent(""" | |
426 | import os | |
427 | import time | |
428 | ||
429 | path = "{path}" | |
430 | ||
431 | print "Starting creation..." | |
432 | start = time.time() | |
433 | ||
434 | os.mkdir(path) | |
435 | dfd = os.open(path, os.O_DIRECTORY) | |
436 | ||
437 | fd = open(os.path.join(path, "childfile"), "w") | |
438 | print "Finished creation in {{0}}s".format(time.time() - start) | |
439 | ||
440 | print "Starting fsync..." | |
441 | start = time.time() | |
442 | if {dirfsync}: | |
443 | os.fsync(dfd) | |
444 | else: | |
445 | os.fsync(fd) | |
446 | print "Finished fsync in {{0}}s".format(time.time() - start) | |
447 | """.format(path=path,dirfsync=str(dirfsync))) | |
448 | ) | |
449 | ||
450 | # Immediately kill the MDS and then client A | |
451 | self.fs.mds_stop() | |
452 | self.fs.mds_fail() | |
453 | self.mount_a.kill() | |
454 | self.mount_a.kill_cleanup() | |
455 | ||
456 | # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay | |
457 | self.fs.mds_restart() | |
458 | log.info("Waiting for reconnect...") | |
459 | self.fs.wait_for_state("up:reconnect") | |
460 | log.info("Waiting for active...") | |
461 | self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout) | |
462 | log.info("Reached active...") | |
463 | ||
464 | # Is the child dentry visible from mount B? | |
465 | self.mount_b.mount() | |
466 | self.mount_b.wait_until_mounted() | |
467 | self.mount_b.run_shell(["ls", "subdir/childfile"]) |