]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | """ | |
3 | Teuthology task for exercising CephFS client recovery | |
4 | """ | |
5 | ||
6 | import logging | |
7 | from textwrap import dedent | |
8 | import time | |
9 | import distutils.version as version | |
10 | import re | |
11 | import os | |
12 | ||
9f95a23c | 13 | from teuthology.orchestra import run |
7c673cae | 14 | from teuthology.orchestra.run import CommandFailedError, ConnectionLostError |
28e407b8 | 15 | from tasks.cephfs.fuse_mount import FuseMount |
7c673cae FG |
16 | from tasks.cephfs.cephfs_test_case import CephFSTestCase |
17 | from teuthology.packaging import get_package_version | |
7c673cae FG |
18 | |
19 | log = logging.getLogger(__name__) | |
20 | ||
21 | ||
22 | # Arbitrary timeouts for operations involving restarting | |
23 | # an MDS or waiting for it to come up | |
24 | MDS_RESTART_GRACE = 60 | |
25 | ||
26 | ||
27 | class TestClientNetworkRecovery(CephFSTestCase): | |
28 | REQUIRE_KCLIENT_REMOTE = True | |
29 | REQUIRE_ONE_CLIENT_REMOTE = True | |
30 | CLIENTS_REQUIRED = 2 | |
31 | ||
f64942e4 | 32 | LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"] |
7c673cae FG |
33 | |
34 | # Environment references | |
7c673cae FG |
35 | mds_reconnect_timeout = None |
36 | ms_max_backoff = None | |
37 | ||
38 | def test_network_death(self): | |
39 | """ | |
40 | Simulate software freeze or temporary network failure. | |
41 | ||
42 | Check that the client blocks I/O during failure, and completes | |
43 | I/O after failure. | |
44 | """ | |
45 | ||
f64942e4 | 46 | session_timeout = self.fs.get_var("session_timeout") |
494da23a | 47 | self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) |
f64942e4 | 48 | |
7c673cae FG |
49 | # We only need one client |
50 | self.mount_b.umount_wait() | |
51 | ||
52 | # Initially our one client session should be visible | |
53 | client_id = self.mount_a.get_global_id() | |
54 | ls_data = self._session_list() | |
55 | self.assert_session_count(1, ls_data) | |
56 | self.assertEqual(ls_data[0]['id'], client_id) | |
57 | self.assert_session_state(client_id, "open") | |
58 | ||
59 | # ...and capable of doing I/O without blocking | |
60 | self.mount_a.create_files() | |
61 | ||
62 | # ...but if we turn off the network | |
63 | self.fs.set_clients_block(True) | |
64 | ||
65 | # ...and try and start an I/O | |
66 | write_blocked = self.mount_a.write_background() | |
67 | ||
68 | # ...then it should block | |
69 | self.assertFalse(write_blocked.finished) | |
70 | self.assert_session_state(client_id, "open") | |
f64942e4 | 71 | time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale |
7c673cae FG |
72 | self.assertFalse(write_blocked.finished) |
73 | self.assert_session_state(client_id, "stale") | |
74 | ||
75 | # ...until we re-enable I/O | |
76 | self.fs.set_clients_block(False) | |
77 | ||
78 | # ...when it should complete promptly | |
79 | a = time.time() | |
80 | self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2) | |
81 | write_blocked.wait() # Already know we're finished, wait() to raise exception on errors | |
82 | recovery_time = time.time() - a | |
83 | log.info("recovery time: {0}".format(recovery_time)) | |
84 | self.assert_session_state(client_id, "open") | |
85 | ||
86 | ||
87 | class TestClientRecovery(CephFSTestCase): | |
88 | REQUIRE_KCLIENT_REMOTE = True | |
89 | CLIENTS_REQUIRED = 2 | |
90 | ||
f64942e4 | 91 | LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"] |
7c673cae FG |
92 | |
93 | # Environment references | |
7c673cae FG |
94 | mds_reconnect_timeout = None |
95 | ms_max_backoff = None | |
96 | ||
97 | def test_basic(self): | |
98 | # Check that two clients come up healthy and see each others' files | |
99 | # ===================================================== | |
100 | self.mount_a.create_files() | |
101 | self.mount_a.check_files() | |
102 | self.mount_a.umount_wait() | |
103 | ||
104 | self.mount_b.check_files() | |
105 | ||
e306af50 | 106 | self.mount_a.mount_wait() |
7c673cae FG |
107 | |
108 | # Check that the admin socket interface is correctly reporting | |
109 | # two sessions | |
110 | # ===================================================== | |
111 | ls_data = self._session_list() | |
112 | self.assert_session_count(2, ls_data) | |
113 | ||
114 | self.assertSetEqual( | |
115 | set([l['id'] for l in ls_data]), | |
116 | {self.mount_a.get_global_id(), self.mount_b.get_global_id()} | |
117 | ) | |
118 | ||
119 | def test_restart(self): | |
120 | # Check that after an MDS restart both clients reconnect and continue | |
121 | # to handle I/O | |
122 | # ===================================================== | |
123 | self.fs.mds_fail_restart() | |
124 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
125 | ||
126 | self.mount_a.create_destroy() | |
127 | self.mount_b.create_destroy() | |
128 | ||
129 | def _session_num_caps(self, client_id): | |
130 | ls_data = self.fs.mds_asok(['session', 'ls']) | |
131 | return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps']) | |
132 | ||
133 | def test_reconnect_timeout(self): | |
134 | # Reconnect timeout | |
135 | # ================= | |
136 | # Check that if I stop an MDS and a client goes away, the MDS waits | |
137 | # for the reconnect period | |
138 | self.fs.mds_stop() | |
139 | self.fs.mds_fail() | |
140 | ||
141 | mount_a_client_id = self.mount_a.get_global_id() | |
142 | self.mount_a.umount_wait(force=True) | |
143 | ||
144 | self.fs.mds_restart() | |
145 | ||
146 | self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) | |
147 | # Check that the MDS locally reports its state correctly | |
148 | status = self.fs.mds_asok(['status']) | |
149 | self.assertIn("reconnect_status", status) | |
150 | ||
151 | ls_data = self._session_list() | |
152 | self.assert_session_count(2, ls_data) | |
153 | ||
154 | # The session for the dead client should have the 'reconnect' flag set | |
155 | self.assertTrue(self.get_session(mount_a_client_id)['reconnecting']) | |
156 | ||
157 | # Wait for the reconnect state to clear, this should take the | |
158 | # reconnect timeout period. | |
159 | in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2) | |
160 | # Check that the period we waited to enter active is within a factor | |
161 | # of two of the reconnect timeout. | |
e306af50 | 162 | self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout // 2, |
7c673cae FG |
163 | "Should have been in reconnect phase for {0} but only took {1}".format( |
164 | self.mds_reconnect_timeout, in_reconnect_for | |
165 | )) | |
166 | ||
167 | self.assert_session_count(1) | |
168 | ||
169 | # Check that the client that timed out during reconnect can | |
170 | # mount again and do I/O | |
e306af50 | 171 | self.mount_a.mount_wait() |
7c673cae FG |
172 | self.mount_a.create_destroy() |
173 | ||
174 | self.assert_session_count(2) | |
175 | ||
176 | def test_reconnect_eviction(self): | |
177 | # Eviction during reconnect | |
178 | # ========================= | |
179 | mount_a_client_id = self.mount_a.get_global_id() | |
180 | ||
181 | self.fs.mds_stop() | |
182 | self.fs.mds_fail() | |
183 | ||
184 | # The mount goes away while the MDS is offline | |
185 | self.mount_a.kill() | |
186 | ||
92f5a8d4 TL |
187 | # wait for it to die |
188 | time.sleep(5) | |
189 | ||
7c673cae FG |
190 | self.fs.mds_restart() |
191 | ||
192 | # Enter reconnect phase | |
193 | self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE) | |
194 | self.assert_session_count(2) | |
195 | ||
196 | # Evict the stuck client | |
197 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
198 | self.assert_session_count(1) | |
199 | ||
200 | # Observe that we proceed to active phase without waiting full reconnect timeout | |
201 | evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
202 | # Once we evict the troublemaker, the reconnect phase should complete | |
203 | # in well under the reconnect timeout. | |
204 | self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5, | |
205 | "reconnect did not complete soon enough after eviction, took {0}".format( | |
206 | evict_til_active | |
207 | )) | |
208 | ||
209 | # We killed earlier so must clean up before trying to use again | |
210 | self.mount_a.kill_cleanup() | |
211 | ||
212 | # Bring the client back | |
e306af50 | 213 | self.mount_a.mount_wait() |
7c673cae FG |
214 | self.mount_a.create_destroy() |
215 | ||
494da23a | 216 | def _test_stale_caps(self, write): |
f64942e4 AA |
217 | session_timeout = self.fs.get_var("session_timeout") |
218 | ||
7c673cae FG |
219 | # Capability release from stale session |
220 | # ===================================== | |
494da23a TL |
221 | if write: |
222 | cap_holder = self.mount_a.open_background() | |
223 | else: | |
224 | self.mount_a.run_shell(["touch", "background_file"]) | |
225 | self.mount_a.umount_wait() | |
e306af50 | 226 | self.mount_a.mount_wait() |
494da23a TL |
227 | cap_holder = self.mount_a.open_background(write=False) |
228 | ||
229 | self.assert_session_count(2) | |
230 | mount_a_gid = self.mount_a.get_global_id() | |
7c673cae FG |
231 | |
232 | # Wait for the file to be visible from another client, indicating | |
233 | # that mount_a has completed its network ops | |
234 | self.mount_b.wait_for_visible() | |
235 | ||
236 | # Simulate client death | |
237 | self.mount_a.kill() | |
238 | ||
eafe8130 TL |
239 | # wait for it to die so it doesn't voluntarily release buffer cap |
240 | time.sleep(5) | |
241 | ||
7c673cae | 242 | try: |
f64942e4 | 243 | # Now, after session_timeout seconds, the waiter should |
7c673cae FG |
244 | # complete their operation when the MDS marks the holder's |
245 | # session stale. | |
246 | cap_waiter = self.mount_b.write_background() | |
247 | a = time.time() | |
248 | cap_waiter.wait() | |
249 | b = time.time() | |
250 | ||
251 | # Should have succeeded | |
252 | self.assertEqual(cap_waiter.exitstatus, 0) | |
253 | ||
494da23a TL |
254 | if write: |
255 | self.assert_session_count(1) | |
256 | else: | |
257 | self.assert_session_state(mount_a_gid, "stale") | |
258 | ||
7c673cae FG |
259 | cap_waited = b - a |
260 | log.info("cap_waiter waited {0}s".format(cap_waited)) | |
f64942e4 | 261 | self.assertTrue(session_timeout / 2.0 <= cap_waited <= session_timeout * 2.0, |
7c673cae | 262 | "Capability handover took {0}, expected approx {1}".format( |
f64942e4 | 263 | cap_waited, session_timeout |
7c673cae FG |
264 | )) |
265 | ||
266 | cap_holder.stdin.close() | |
267 | try: | |
268 | cap_holder.wait() | |
269 | except (CommandFailedError, ConnectionLostError): | |
270 | # We killed it (and possibly its node), so it raises an error | |
271 | pass | |
272 | finally: | |
273 | # teardown() doesn't quite handle this case cleanly, so help it out | |
274 | self.mount_a.kill_cleanup() | |
275 | ||
276 | self.mount_a.mount() | |
277 | self.mount_a.wait_until_mounted() | |
278 | ||
494da23a TL |
279 | def test_stale_read_caps(self): |
280 | self._test_stale_caps(False) | |
281 | ||
282 | def test_stale_write_caps(self): | |
283 | self._test_stale_caps(True) | |
284 | ||
7c673cae FG |
285 | def test_evicted_caps(self): |
286 | # Eviction while holding a capability | |
287 | # =================================== | |
288 | ||
f64942e4 AA |
289 | session_timeout = self.fs.get_var("session_timeout") |
290 | ||
7c673cae FG |
291 | # Take out a write capability on a file on client A, |
292 | # and then immediately kill it. | |
293 | cap_holder = self.mount_a.open_background() | |
294 | mount_a_client_id = self.mount_a.get_global_id() | |
295 | ||
296 | # Wait for the file to be visible from another client, indicating | |
297 | # that mount_a has completed its network ops | |
298 | self.mount_b.wait_for_visible() | |
299 | ||
300 | # Simulate client death | |
301 | self.mount_a.kill() | |
302 | ||
eafe8130 TL |
303 | # wait for it to die so it doesn't voluntarily release buffer cap |
304 | time.sleep(5) | |
305 | ||
7c673cae FG |
306 | try: |
307 | # The waiter should get stuck waiting for the capability | |
308 | # held on the MDS by the now-dead client A | |
309 | cap_waiter = self.mount_b.write_background() | |
310 | time.sleep(5) | |
311 | self.assertFalse(cap_waiter.finished) | |
312 | ||
313 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
314 | # Now, because I evicted the old holder of the capability, it should | |
315 | # immediately get handed over to the waiter | |
316 | a = time.time() | |
317 | cap_waiter.wait() | |
318 | b = time.time() | |
319 | cap_waited = b - a | |
320 | log.info("cap_waiter waited {0}s".format(cap_waited)) | |
321 | # This is the check that it happened 'now' rather than waiting | |
322 | # for the session timeout | |
f64942e4 | 323 | self.assertLess(cap_waited, session_timeout / 2.0, |
7c673cae | 324 | "Capability handover took {0}, expected less than {1}".format( |
f64942e4 | 325 | cap_waited, session_timeout / 2.0 |
7c673cae FG |
326 | )) |
327 | ||
328 | cap_holder.stdin.close() | |
329 | try: | |
330 | cap_holder.wait() | |
331 | except (CommandFailedError, ConnectionLostError): | |
332 | # We killed it (and possibly its node), so it raises an error | |
333 | pass | |
334 | finally: | |
335 | self.mount_a.kill_cleanup() | |
336 | ||
337 | self.mount_a.mount() | |
338 | self.mount_a.wait_until_mounted() | |
339 | ||
340 | def test_trim_caps(self): | |
341 | # Trim capability when reconnecting MDS | |
342 | # =================================== | |
343 | ||
344 | count = 500 | |
345 | # Create lots of files | |
346 | for i in range(count): | |
347 | self.mount_a.run_shell(["touch", "f{0}".format(i)]) | |
348 | ||
349 | # Populate mount_b's cache | |
31f18b77 | 350 | self.mount_b.run_shell(["ls", "-l"]) |
7c673cae FG |
351 | |
352 | client_id = self.mount_b.get_global_id() | |
353 | num_caps = self._session_num_caps(client_id) | |
354 | self.assertGreaterEqual(num_caps, count) | |
355 | ||
356 | # Restart MDS. client should trim its cache when reconnecting to the MDS | |
357 | self.fs.mds_fail_restart() | |
358 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
359 | ||
360 | num_caps = self._session_num_caps(client_id) | |
361 | self.assertLess(num_caps, count, | |
362 | "should have less than {0} capabilities, have {1}".format( | |
363 | count, num_caps | |
364 | )) | |
365 | ||
31f18b77 | 366 | def _is_flockable(self): |
7c673cae FG |
367 | a_version_str = get_package_version(self.mount_a.client_remote, "fuse") |
368 | b_version_str = get_package_version(self.mount_b.client_remote, "fuse") | |
369 | flock_version_str = "2.9" | |
370 | ||
371 | version_regex = re.compile(r"[0-9\.]+") | |
372 | a_result = version_regex.match(a_version_str) | |
373 | self.assertTrue(a_result) | |
374 | b_result = version_regex.match(b_version_str) | |
375 | self.assertTrue(b_result) | |
376 | a_version = version.StrictVersion(a_result.group()) | |
377 | b_version = version.StrictVersion(b_result.group()) | |
378 | flock_version=version.StrictVersion(flock_version_str) | |
379 | ||
7c673cae | 380 | if (a_version >= flock_version and b_version >= flock_version): |
31f18b77 FG |
381 | log.info("flock locks are available") |
382 | return True | |
7c673cae FG |
383 | else: |
384 | log.info("not testing flock locks, machines have versions {av} and {bv}".format( | |
385 | av=a_version_str,bv=b_version_str)) | |
31f18b77 FG |
386 | return False |
387 | ||
388 | def test_filelock(self): | |
389 | """ | |
390 | Check that file lock doesn't get lost after an MDS restart | |
391 | """ | |
7c673cae | 392 | |
31f18b77 | 393 | flockable = self._is_flockable() |
7c673cae FG |
394 | lock_holder = self.mount_a.lock_background(do_flock=flockable) |
395 | ||
396 | self.mount_b.wait_for_visible("background_file-2") | |
397 | self.mount_b.check_filelock(do_flock=flockable) | |
398 | ||
399 | self.fs.mds_fail_restart() | |
400 | self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE) | |
401 | ||
402 | self.mount_b.check_filelock(do_flock=flockable) | |
403 | ||
404 | # Tear down the background process | |
405 | lock_holder.stdin.close() | |
406 | try: | |
407 | lock_holder.wait() | |
408 | except (CommandFailedError, ConnectionLostError): | |
409 | # We killed it, so it raises an error | |
410 | pass | |
411 | ||
31f18b77 FG |
412 | def test_filelock_eviction(self): |
413 | """ | |
414 | Check that file lock held by evicted client is given to | |
415 | waiting client. | |
416 | """ | |
417 | if not self._is_flockable(): | |
418 | self.skipTest("flock is not available") | |
419 | ||
420 | lock_holder = self.mount_a.lock_background() | |
421 | self.mount_b.wait_for_visible("background_file-2") | |
422 | self.mount_b.check_filelock() | |
423 | ||
424 | lock_taker = self.mount_b.lock_and_release() | |
425 | # Check the taker is waiting (doesn't get it immediately) | |
426 | time.sleep(2) | |
427 | self.assertFalse(lock_holder.finished) | |
428 | self.assertFalse(lock_taker.finished) | |
429 | ||
181888fb FG |
430 | try: |
431 | mount_a_client_id = self.mount_a.get_global_id() | |
432 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
31f18b77 | 433 | |
181888fb FG |
434 | # Evicting mount_a should let mount_b's attempt to take the lock |
435 | # succeed | |
436 | self.wait_until_true(lambda: lock_taker.finished, timeout=10) | |
437 | finally: | |
438 | # teardown() doesn't quite handle this case cleanly, so help it out | |
439 | self.mount_a.kill() | |
440 | self.mount_a.kill_cleanup() | |
441 | ||
442 | # Bring the client back | |
e306af50 | 443 | self.mount_a.mount_wait() |
31f18b77 | 444 | |
7c673cae | 445 | def test_dir_fsync(self): |
9f95a23c | 446 | self._test_fsync(True); |
7c673cae FG |
447 | |
448 | def test_create_fsync(self): | |
9f95a23c | 449 | self._test_fsync(False); |
7c673cae FG |
450 | |
451 | def _test_fsync(self, dirfsync): | |
452 | """ | |
453 | That calls to fsync guarantee visibility of metadata to another | |
454 | client immediately after the fsyncing client dies. | |
455 | """ | |
456 | ||
457 | # Leave this guy out until he's needed | |
458 | self.mount_b.umount_wait() | |
459 | ||
460 | # Create dir + child dentry on client A, and fsync the dir | |
461 | path = os.path.join(self.mount_a.mountpoint, "subdir") | |
462 | self.mount_a.run_python( | |
463 | dedent(""" | |
464 | import os | |
465 | import time | |
466 | ||
467 | path = "{path}" | |
468 | ||
9f95a23c | 469 | print("Starting creation...") |
7c673cae FG |
470 | start = time.time() |
471 | ||
472 | os.mkdir(path) | |
473 | dfd = os.open(path, os.O_DIRECTORY) | |
474 | ||
475 | fd = open(os.path.join(path, "childfile"), "w") | |
9f95a23c | 476 | print("Finished creation in {{0}}s".format(time.time() - start)) |
7c673cae | 477 | |
9f95a23c | 478 | print("Starting fsync...") |
7c673cae FG |
479 | start = time.time() |
480 | if {dirfsync}: | |
481 | os.fsync(dfd) | |
482 | else: | |
483 | os.fsync(fd) | |
9f95a23c | 484 | print("Finished fsync in {{0}}s".format(time.time() - start)) |
7c673cae FG |
485 | """.format(path=path,dirfsync=str(dirfsync))) |
486 | ) | |
487 | ||
488 | # Immediately kill the MDS and then client A | |
489 | self.fs.mds_stop() | |
490 | self.fs.mds_fail() | |
491 | self.mount_a.kill() | |
492 | self.mount_a.kill_cleanup() | |
493 | ||
494 | # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay | |
495 | self.fs.mds_restart() | |
496 | log.info("Waiting for reconnect...") | |
497 | self.fs.wait_for_state("up:reconnect") | |
498 | log.info("Waiting for active...") | |
499 | self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout) | |
500 | log.info("Reached active...") | |
501 | ||
502 | # Is the child dentry visible from mount B? | |
e306af50 | 503 | self.mount_b.mount_wait() |
7c673cae | 504 | self.mount_b.run_shell(["ls", "subdir/childfile"]) |
28e407b8 | 505 | |
11fdf7f2 TL |
506 | def test_unmount_for_evicted_client(self): |
507 | """Test if client hangs on unmount after evicting the client.""" | |
508 | mount_a_client_id = self.mount_a.get_global_id() | |
509 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
510 | ||
511 | self.mount_a.umount_wait(require_clean=True, timeout=30) | |
512 | ||
28e407b8 AA |
513 | def test_stale_renew(self): |
514 | if not isinstance(self.mount_a, FuseMount): | |
9f95a23c | 515 | self.skipTest("Require FUSE client to handle signal STOP/CONT") |
28e407b8 | 516 | |
f64942e4 AA |
517 | session_timeout = self.fs.get_var("session_timeout") |
518 | ||
28e407b8 AA |
519 | self.mount_a.run_shell(["mkdir", "testdir"]) |
520 | self.mount_a.run_shell(["touch", "testdir/file1"]) | |
521 | # populate readdir cache | |
522 | self.mount_a.run_shell(["ls", "testdir"]) | |
523 | self.mount_b.run_shell(["ls", "testdir"]) | |
524 | ||
525 | # check if readdir cache is effective | |
526 | initial_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency']) | |
527 | self.mount_b.run_shell(["ls", "testdir"]) | |
528 | current_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency']) | |
529 | self.assertEqual(current_readdirs, initial_readdirs); | |
530 | ||
531 | mount_b_gid = self.mount_b.get_global_id() | |
532 | mount_b_pid = self.mount_b.get_client_pid() | |
533 | # stop ceph-fuse process of mount_b | |
534 | self.mount_b.client_remote.run(args=["sudo", "kill", "-STOP", mount_b_pid]) | |
535 | ||
536 | self.assert_session_state(mount_b_gid, "open") | |
f64942e4 | 537 | time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale |
28e407b8 AA |
538 | |
539 | self.mount_a.run_shell(["touch", "testdir/file2"]) | |
494da23a | 540 | self.assert_session_state(mount_b_gid, "stale") |
28e407b8 AA |
541 | |
542 | # resume ceph-fuse process of mount_b | |
543 | self.mount_b.client_remote.run(args=["sudo", "kill", "-CONT", mount_b_pid]) | |
544 | # Is the new file visible from mount_b? (caps become invalid after session stale) | |
545 | self.mount_b.run_shell(["ls", "testdir/file2"]) | |
546 | ||
11fdf7f2 TL |
547 | def test_abort_conn(self): |
548 | """ | |
549 | Check that abort_conn() skips closing mds sessions. | |
550 | """ | |
551 | if not isinstance(self.mount_a, FuseMount): | |
9f95a23c | 552 | self.skipTest("Testing libcephfs function") |
28e407b8 | 553 | |
494da23a | 554 | self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) |
11fdf7f2 TL |
555 | session_timeout = self.fs.get_var("session_timeout") |
556 | ||
557 | self.mount_a.umount_wait() | |
558 | self.mount_b.umount_wait() | |
559 | ||
560 | gid_str = self.mount_a.run_python(dedent(""" | |
561 | import cephfs as libcephfs | |
562 | cephfs = libcephfs.LibCephFS(conffile='') | |
563 | cephfs.mount() | |
564 | client_id = cephfs.get_instance_id() | |
565 | cephfs.abort_conn() | |
9f95a23c | 566 | print(client_id) |
11fdf7f2 TL |
567 | """) |
568 | ) | |
569 | gid = int(gid_str); | |
570 | ||
571 | self.assert_session_state(gid, "open") | |
572 | time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale | |
573 | self.assert_session_state(gid, "stale") | |
eafe8130 TL |
574 | |
575 | def test_dont_mark_unresponsive_client_stale(self): | |
576 | """ | |
577 | Test that an unresponsive client holding caps is not marked stale or | |
578 | evicted unless another clients wants its caps. | |
579 | """ | |
580 | if not isinstance(self.mount_a, FuseMount): | |
581 | self.skipTest("Require FUSE client to handle signal STOP/CONT") | |
582 | ||
583 | # XXX: To conduct this test we need at least two clients since a | |
584 | # single client is never evcited by MDS. | |
585 | SESSION_TIMEOUT = 30 | |
586 | SESSION_AUTOCLOSE = 50 | |
587 | time_at_beg = time.time() | |
588 | mount_a_gid = self.mount_a.get_global_id() | |
9f95a23c | 589 | _ = self.mount_a.client_pid |
eafe8130 TL |
590 | self.fs.set_var('session_timeout', SESSION_TIMEOUT) |
591 | self.fs.set_var('session_autoclose', SESSION_AUTOCLOSE) | |
592 | self.assert_session_count(2, self.fs.mds_asok(['session', 'ls'])) | |
593 | ||
594 | # test that client holding cap not required by any other client is not | |
595 | # marked stale when it becomes unresponsive. | |
596 | self.mount_a.run_shell(['mkdir', 'dir']) | |
597 | self.mount_a.send_signal('sigstop') | |
598 | time.sleep(SESSION_TIMEOUT + 2) | |
599 | self.assert_session_state(mount_a_gid, "open") | |
600 | ||
601 | # test that other clients have to wait to get the caps from | |
602 | # unresponsive client until session_autoclose. | |
603 | self.mount_b.run_shell(['stat', 'dir']) | |
604 | self.assert_session_count(1, self.fs.mds_asok(['session', 'ls'])) | |
605 | self.assertLess(time.time(), time_at_beg + SESSION_AUTOCLOSE) | |
606 | ||
607 | self.mount_a.send_signal('sigcont') | |
92f5a8d4 TL |
608 | |
609 | def test_config_session_timeout(self): | |
610 | self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false']) | |
611 | session_timeout = self.fs.get_var("session_timeout") | |
612 | mount_a_gid = self.mount_a.get_global_id() | |
613 | ||
614 | self.fs.mds_asok(['session', 'config', '%s' % mount_a_gid, 'timeout', '%s' % (session_timeout * 2)]) | |
615 | ||
616 | self.mount_a.kill(); | |
617 | ||
618 | self.assert_session_count(2) | |
619 | ||
620 | time.sleep(session_timeout * 1.5) | |
621 | self.assert_session_state(mount_a_gid, "open") | |
622 | ||
623 | time.sleep(session_timeout) | |
624 | self.assert_session_count(1) | |
625 | ||
626 | self.mount_a.kill_cleanup() | |
9f95a23c TL |
627 | |
628 | def test_reconnect_after_blacklisted(self): | |
629 | """ | |
630 | Test reconnect after blacklisted. | |
631 | - writing to a fd that was opened before blacklist should return -EBADF | |
632 | - reading/writing to a file with lost file locks should return -EIO | |
633 | - readonly fd should continue to work | |
634 | """ | |
635 | ||
636 | self.mount_a.umount_wait() | |
637 | ||
638 | if isinstance(self.mount_a, FuseMount): | |
f6b5b4d7 | 639 | self.mount_a.mount(mount_options=['--client_reconnect_stale=1', '--fuse_disable_pagecache=1']) |
9f95a23c TL |
640 | else: |
641 | try: | |
642 | self.mount_a.mount(mount_options=['recover_session=clean']) | |
643 | except CommandFailedError: | |
644 | self.mount_a.kill_cleanup() | |
645 | self.skipTest("Not implemented in current kernel") | |
646 | ||
647 | self.mount_a.wait_until_mounted() | |
648 | ||
649 | path = os.path.join(self.mount_a.mountpoint, 'testfile_reconnect_after_blacklisted') | |
650 | pyscript = dedent(""" | |
651 | import os | |
652 | import sys | |
653 | import fcntl | |
654 | import errno | |
655 | import time | |
656 | ||
657 | fd1 = os.open("{path}.1", os.O_RDWR | os.O_CREAT, 0O666) | |
658 | fd2 = os.open("{path}.1", os.O_RDONLY) | |
659 | fd3 = os.open("{path}.2", os.O_RDWR | os.O_CREAT, 0O666) | |
660 | fd4 = os.open("{path}.2", os.O_RDONLY) | |
661 | ||
662 | os.write(fd1, b'content') | |
663 | os.read(fd2, 1); | |
664 | ||
665 | os.write(fd3, b'content') | |
666 | os.read(fd4, 1); | |
667 | fcntl.flock(fd4, fcntl.LOCK_SH | fcntl.LOCK_NB) | |
668 | ||
669 | print("blacklist") | |
670 | sys.stdout.flush() | |
671 | ||
672 | sys.stdin.readline() | |
673 | ||
674 | # wait for mds to close session | |
675 | time.sleep(10); | |
676 | ||
677 | # trigger 'open session' message. kclient relies on 'session reject' message | |
678 | # to detect if itself is blacklisted | |
679 | try: | |
680 | os.stat("{path}.1") | |
681 | except: | |
682 | pass | |
683 | ||
684 | # wait for auto reconnect | |
685 | time.sleep(10); | |
686 | ||
687 | try: | |
688 | os.write(fd1, b'content') | |
689 | except OSError as e: | |
690 | if e.errno != errno.EBADF: | |
691 | raise | |
692 | else: | |
693 | raise RuntimeError("write() failed to raise error") | |
694 | ||
695 | os.read(fd2, 1); | |
696 | ||
697 | try: | |
698 | os.read(fd4, 1) | |
699 | except OSError as e: | |
700 | if e.errno != errno.EIO: | |
701 | raise | |
702 | else: | |
703 | raise RuntimeError("read() failed to raise error") | |
704 | """).format(path=path) | |
705 | rproc = self.mount_a.client_remote.run( | |
706 | args=['sudo', 'python3', '-c', pyscript], | |
707 | wait=False, stdin=run.PIPE, stdout=run.PIPE) | |
708 | ||
709 | rproc.stdout.readline() | |
710 | ||
711 | mount_a_client_id = self.mount_a.get_global_id() | |
712 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
713 | ||
714 | rproc.stdin.writelines(['done\n']) | |
715 | rproc.stdin.flush() | |
716 | ||
717 | rproc.wait() | |
718 | self.assertEqual(rproc.exitstatus, 0) |