]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_client_recovery.py
e18fe997e4d1b0e48b91f4875eea42a5987caddf
[ceph.git] / ceph / qa / tasks / cephfs / test_client_recovery.py
1
2 """
3 Teuthology task for exercising CephFS client recovery
4 """
5
6 import logging
7 from textwrap import dedent
8 import time
9 import distutils.version as version
10 import re
11 import os
12
13 from teuthology.orchestra import run
14 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
15 from tasks.cephfs.fuse_mount import FuseMount
16 from tasks.cephfs.cephfs_test_case import CephFSTestCase
17 from teuthology.packaging import get_package_version
18
19 log = logging.getLogger(__name__)
20
21
22 # Arbitrary timeouts for operations involving restarting
23 # an MDS or waiting for it to come up
24 MDS_RESTART_GRACE = 60
25
26
27 class TestClientNetworkRecovery(CephFSTestCase):
28 REQUIRE_KCLIENT_REMOTE = True
29 REQUIRE_ONE_CLIENT_REMOTE = True
30 CLIENTS_REQUIRED = 2
31
32 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
33
34 # Environment references
35 mds_reconnect_timeout = None
36 ms_max_backoff = None
37
38 def test_network_death(self):
39 """
40 Simulate software freeze or temporary network failure.
41
42 Check that the client blocks I/O during failure, and completes
43 I/O after failure.
44 """
45
46 session_timeout = self.fs.get_var("session_timeout")
47 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
48
49 # We only need one client
50 self.mount_b.umount_wait()
51
52 # Initially our one client session should be visible
53 client_id = self.mount_a.get_global_id()
54 ls_data = self._session_list()
55 self.assert_session_count(1, ls_data)
56 self.assertEqual(ls_data[0]['id'], client_id)
57 self.assert_session_state(client_id, "open")
58
59 # ...and capable of doing I/O without blocking
60 self.mount_a.create_files()
61
62 # ...but if we turn off the network
63 self.fs.set_clients_block(True)
64
65 # ...and try and start an I/O
66 write_blocked = self.mount_a.write_background()
67
68 # ...then it should block
69 self.assertFalse(write_blocked.finished)
70 self.assert_session_state(client_id, "open")
71 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
72 self.assertFalse(write_blocked.finished)
73 self.assert_session_state(client_id, "stale")
74
75 # ...until we re-enable I/O
76 self.fs.set_clients_block(False)
77
78 # ...when it should complete promptly
79 a = time.time()
80 self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2)
81 write_blocked.wait() # Already know we're finished, wait() to raise exception on errors
82 recovery_time = time.time() - a
83 log.info("recovery time: {0}".format(recovery_time))
84 self.assert_session_state(client_id, "open")
85
86
87 class TestClientRecovery(CephFSTestCase):
88 REQUIRE_KCLIENT_REMOTE = True
89 CLIENTS_REQUIRED = 2
90
91 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
92
93 # Environment references
94 mds_reconnect_timeout = None
95 ms_max_backoff = None
96
97 def test_basic(self):
98 # Check that two clients come up healthy and see each others' files
99 # =====================================================
100 self.mount_a.create_files()
101 self.mount_a.check_files()
102 self.mount_a.umount_wait()
103
104 self.mount_b.check_files()
105
106 self.mount_a.mount()
107 self.mount_a.wait_until_mounted()
108
109 # Check that the admin socket interface is correctly reporting
110 # two sessions
111 # =====================================================
112 ls_data = self._session_list()
113 self.assert_session_count(2, ls_data)
114
115 self.assertSetEqual(
116 set([l['id'] for l in ls_data]),
117 {self.mount_a.get_global_id(), self.mount_b.get_global_id()}
118 )
119
120 def test_restart(self):
121 # Check that after an MDS restart both clients reconnect and continue
122 # to handle I/O
123 # =====================================================
124 self.fs.mds_fail_restart()
125 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
126
127 self.mount_a.create_destroy()
128 self.mount_b.create_destroy()
129
130 def _session_num_caps(self, client_id):
131 ls_data = self.fs.mds_asok(['session', 'ls'])
132 return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps'])
133
134 def test_reconnect_timeout(self):
135 # Reconnect timeout
136 # =================
137 # Check that if I stop an MDS and a client goes away, the MDS waits
138 # for the reconnect period
139 self.fs.mds_stop()
140 self.fs.mds_fail()
141
142 mount_a_client_id = self.mount_a.get_global_id()
143 self.mount_a.umount_wait(force=True)
144
145 self.fs.mds_restart()
146
147 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
148 # Check that the MDS locally reports its state correctly
149 status = self.fs.mds_asok(['status'])
150 self.assertIn("reconnect_status", status)
151
152 ls_data = self._session_list()
153 self.assert_session_count(2, ls_data)
154
155 # The session for the dead client should have the 'reconnect' flag set
156 self.assertTrue(self.get_session(mount_a_client_id)['reconnecting'])
157
158 # Wait for the reconnect state to clear, this should take the
159 # reconnect timeout period.
160 in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2)
161 # Check that the period we waited to enter active is within a factor
162 # of two of the reconnect timeout.
163 self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2,
164 "Should have been in reconnect phase for {0} but only took {1}".format(
165 self.mds_reconnect_timeout, in_reconnect_for
166 ))
167
168 self.assert_session_count(1)
169
170 # Check that the client that timed out during reconnect can
171 # mount again and do I/O
172 self.mount_a.mount()
173 self.mount_a.wait_until_mounted()
174 self.mount_a.create_destroy()
175
176 self.assert_session_count(2)
177
178 def test_reconnect_eviction(self):
179 # Eviction during reconnect
180 # =========================
181 mount_a_client_id = self.mount_a.get_global_id()
182
183 self.fs.mds_stop()
184 self.fs.mds_fail()
185
186 # The mount goes away while the MDS is offline
187 self.mount_a.kill()
188
189 # wait for it to die
190 time.sleep(5)
191
192 self.fs.mds_restart()
193
194 # Enter reconnect phase
195 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
196 self.assert_session_count(2)
197
198 # Evict the stuck client
199 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
200 self.assert_session_count(1)
201
202 # Observe that we proceed to active phase without waiting full reconnect timeout
203 evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
204 # Once we evict the troublemaker, the reconnect phase should complete
205 # in well under the reconnect timeout.
206 self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5,
207 "reconnect did not complete soon enough after eviction, took {0}".format(
208 evict_til_active
209 ))
210
211 # We killed earlier so must clean up before trying to use again
212 self.mount_a.kill_cleanup()
213
214 # Bring the client back
215 self.mount_a.mount()
216 self.mount_a.wait_until_mounted()
217 self.mount_a.create_destroy()
218
219 def _test_stale_caps(self, write):
220 session_timeout = self.fs.get_var("session_timeout")
221
222 # Capability release from stale session
223 # =====================================
224 if write:
225 cap_holder = self.mount_a.open_background()
226 else:
227 self.mount_a.run_shell(["touch", "background_file"])
228 self.mount_a.umount_wait()
229 self.mount_a.mount()
230 self.mount_a.wait_until_mounted()
231 cap_holder = self.mount_a.open_background(write=False)
232
233 self.assert_session_count(2)
234 mount_a_gid = self.mount_a.get_global_id()
235
236 # Wait for the file to be visible from another client, indicating
237 # that mount_a has completed its network ops
238 self.mount_b.wait_for_visible()
239
240 # Simulate client death
241 self.mount_a.kill()
242
243 # wait for it to die so it doesn't voluntarily release buffer cap
244 time.sleep(5)
245
246 try:
247 # Now, after session_timeout seconds, the waiter should
248 # complete their operation when the MDS marks the holder's
249 # session stale.
250 cap_waiter = self.mount_b.write_background()
251 a = time.time()
252 cap_waiter.wait()
253 b = time.time()
254
255 # Should have succeeded
256 self.assertEqual(cap_waiter.exitstatus, 0)
257
258 if write:
259 self.assert_session_count(1)
260 else:
261 self.assert_session_state(mount_a_gid, "stale")
262
263 cap_waited = b - a
264 log.info("cap_waiter waited {0}s".format(cap_waited))
265 self.assertTrue(session_timeout / 2.0 <= cap_waited <= session_timeout * 2.0,
266 "Capability handover took {0}, expected approx {1}".format(
267 cap_waited, session_timeout
268 ))
269
270 cap_holder.stdin.close()
271 try:
272 cap_holder.wait()
273 except (CommandFailedError, ConnectionLostError):
274 # We killed it (and possibly its node), so it raises an error
275 pass
276 finally:
277 # teardown() doesn't quite handle this case cleanly, so help it out
278 self.mount_a.kill_cleanup()
279
280 self.mount_a.mount()
281 self.mount_a.wait_until_mounted()
282
283 def test_stale_read_caps(self):
284 self._test_stale_caps(False)
285
286 def test_stale_write_caps(self):
287 self._test_stale_caps(True)
288
289 def test_evicted_caps(self):
290 # Eviction while holding a capability
291 # ===================================
292
293 session_timeout = self.fs.get_var("session_timeout")
294
295 # Take out a write capability on a file on client A,
296 # and then immediately kill it.
297 cap_holder = self.mount_a.open_background()
298 mount_a_client_id = self.mount_a.get_global_id()
299
300 # Wait for the file to be visible from another client, indicating
301 # that mount_a has completed its network ops
302 self.mount_b.wait_for_visible()
303
304 # Simulate client death
305 self.mount_a.kill()
306
307 # wait for it to die so it doesn't voluntarily release buffer cap
308 time.sleep(5)
309
310 try:
311 # The waiter should get stuck waiting for the capability
312 # held on the MDS by the now-dead client A
313 cap_waiter = self.mount_b.write_background()
314 time.sleep(5)
315 self.assertFalse(cap_waiter.finished)
316
317 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
318 # Now, because I evicted the old holder of the capability, it should
319 # immediately get handed over to the waiter
320 a = time.time()
321 cap_waiter.wait()
322 b = time.time()
323 cap_waited = b - a
324 log.info("cap_waiter waited {0}s".format(cap_waited))
325 # This is the check that it happened 'now' rather than waiting
326 # for the session timeout
327 self.assertLess(cap_waited, session_timeout / 2.0,
328 "Capability handover took {0}, expected less than {1}".format(
329 cap_waited, session_timeout / 2.0
330 ))
331
332 cap_holder.stdin.close()
333 try:
334 cap_holder.wait()
335 except (CommandFailedError, ConnectionLostError):
336 # We killed it (and possibly its node), so it raises an error
337 pass
338 finally:
339 self.mount_a.kill_cleanup()
340
341 self.mount_a.mount()
342 self.mount_a.wait_until_mounted()
343
344 def test_trim_caps(self):
345 # Trim capability when reconnecting MDS
346 # ===================================
347
348 count = 500
349 # Create lots of files
350 for i in range(count):
351 self.mount_a.run_shell(["touch", "f{0}".format(i)])
352
353 # Populate mount_b's cache
354 self.mount_b.run_shell(["ls", "-l"])
355
356 client_id = self.mount_b.get_global_id()
357 num_caps = self._session_num_caps(client_id)
358 self.assertGreaterEqual(num_caps, count)
359
360 # Restart MDS. client should trim its cache when reconnecting to the MDS
361 self.fs.mds_fail_restart()
362 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
363
364 num_caps = self._session_num_caps(client_id)
365 self.assertLess(num_caps, count,
366 "should have less than {0} capabilities, have {1}".format(
367 count, num_caps
368 ))
369
370 def _is_flockable(self):
371 a_version_str = get_package_version(self.mount_a.client_remote, "fuse")
372 b_version_str = get_package_version(self.mount_b.client_remote, "fuse")
373 flock_version_str = "2.9"
374
375 version_regex = re.compile(r"[0-9\.]+")
376 a_result = version_regex.match(a_version_str)
377 self.assertTrue(a_result)
378 b_result = version_regex.match(b_version_str)
379 self.assertTrue(b_result)
380 a_version = version.StrictVersion(a_result.group())
381 b_version = version.StrictVersion(b_result.group())
382 flock_version=version.StrictVersion(flock_version_str)
383
384 if (a_version >= flock_version and b_version >= flock_version):
385 log.info("flock locks are available")
386 return True
387 else:
388 log.info("not testing flock locks, machines have versions {av} and {bv}".format(
389 av=a_version_str,bv=b_version_str))
390 return False
391
392 def test_filelock(self):
393 """
394 Check that file lock doesn't get lost after an MDS restart
395 """
396
397 flockable = self._is_flockable()
398 lock_holder = self.mount_a.lock_background(do_flock=flockable)
399
400 self.mount_b.wait_for_visible("background_file-2")
401 self.mount_b.check_filelock(do_flock=flockable)
402
403 self.fs.mds_fail_restart()
404 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
405
406 self.mount_b.check_filelock(do_flock=flockable)
407
408 # Tear down the background process
409 lock_holder.stdin.close()
410 try:
411 lock_holder.wait()
412 except (CommandFailedError, ConnectionLostError):
413 # We killed it, so it raises an error
414 pass
415
416 def test_filelock_eviction(self):
417 """
418 Check that file lock held by evicted client is given to
419 waiting client.
420 """
421 if not self._is_flockable():
422 self.skipTest("flock is not available")
423
424 lock_holder = self.mount_a.lock_background()
425 self.mount_b.wait_for_visible("background_file-2")
426 self.mount_b.check_filelock()
427
428 lock_taker = self.mount_b.lock_and_release()
429 # Check the taker is waiting (doesn't get it immediately)
430 time.sleep(2)
431 self.assertFalse(lock_holder.finished)
432 self.assertFalse(lock_taker.finished)
433
434 try:
435 mount_a_client_id = self.mount_a.get_global_id()
436 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
437
438 # Evicting mount_a should let mount_b's attempt to take the lock
439 # succeed
440 self.wait_until_true(lambda: lock_taker.finished, timeout=10)
441 finally:
442 # teardown() doesn't quite handle this case cleanly, so help it out
443 self.mount_a.kill()
444 self.mount_a.kill_cleanup()
445
446 # Bring the client back
447 self.mount_a.mount()
448 self.mount_a.wait_until_mounted()
449
450 def test_dir_fsync(self):
451 self._test_fsync(True);
452
453 def test_create_fsync(self):
454 self._test_fsync(False);
455
456 def _test_fsync(self, dirfsync):
457 """
458 That calls to fsync guarantee visibility of metadata to another
459 client immediately after the fsyncing client dies.
460 """
461
462 # Leave this guy out until he's needed
463 self.mount_b.umount_wait()
464
465 # Create dir + child dentry on client A, and fsync the dir
466 path = os.path.join(self.mount_a.mountpoint, "subdir")
467 self.mount_a.run_python(
468 dedent("""
469 import os
470 import time
471
472 path = "{path}"
473
474 print("Starting creation...")
475 start = time.time()
476
477 os.mkdir(path)
478 dfd = os.open(path, os.O_DIRECTORY)
479
480 fd = open(os.path.join(path, "childfile"), "w")
481 print("Finished creation in {{0}}s".format(time.time() - start))
482
483 print("Starting fsync...")
484 start = time.time()
485 if {dirfsync}:
486 os.fsync(dfd)
487 else:
488 os.fsync(fd)
489 print("Finished fsync in {{0}}s".format(time.time() - start))
490 """.format(path=path,dirfsync=str(dirfsync)))
491 )
492
493 # Immediately kill the MDS and then client A
494 self.fs.mds_stop()
495 self.fs.mds_fail()
496 self.mount_a.kill()
497 self.mount_a.kill_cleanup()
498
499 # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay
500 self.fs.mds_restart()
501 log.info("Waiting for reconnect...")
502 self.fs.wait_for_state("up:reconnect")
503 log.info("Waiting for active...")
504 self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout)
505 log.info("Reached active...")
506
507 # Is the child dentry visible from mount B?
508 self.mount_b.mount()
509 self.mount_b.wait_until_mounted()
510 self.mount_b.run_shell(["ls", "subdir/childfile"])
511
512 def test_unmount_for_evicted_client(self):
513 """Test if client hangs on unmount after evicting the client."""
514 mount_a_client_id = self.mount_a.get_global_id()
515 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
516
517 self.mount_a.umount_wait(require_clean=True, timeout=30)
518
519 def test_stale_renew(self):
520 if not isinstance(self.mount_a, FuseMount):
521 self.skipTest("Require FUSE client to handle signal STOP/CONT")
522
523 session_timeout = self.fs.get_var("session_timeout")
524
525 self.mount_a.run_shell(["mkdir", "testdir"])
526 self.mount_a.run_shell(["touch", "testdir/file1"])
527 # populate readdir cache
528 self.mount_a.run_shell(["ls", "testdir"])
529 self.mount_b.run_shell(["ls", "testdir"])
530
531 # check if readdir cache is effective
532 initial_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
533 self.mount_b.run_shell(["ls", "testdir"])
534 current_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
535 self.assertEqual(current_readdirs, initial_readdirs);
536
537 mount_b_gid = self.mount_b.get_global_id()
538 mount_b_pid = self.mount_b.get_client_pid()
539 # stop ceph-fuse process of mount_b
540 self.mount_b.client_remote.run(args=["sudo", "kill", "-STOP", mount_b_pid])
541
542 self.assert_session_state(mount_b_gid, "open")
543 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
544
545 self.mount_a.run_shell(["touch", "testdir/file2"])
546 self.assert_session_state(mount_b_gid, "stale")
547
548 # resume ceph-fuse process of mount_b
549 self.mount_b.client_remote.run(args=["sudo", "kill", "-CONT", mount_b_pid])
550 # Is the new file visible from mount_b? (caps become invalid after session stale)
551 self.mount_b.run_shell(["ls", "testdir/file2"])
552
553 def test_abort_conn(self):
554 """
555 Check that abort_conn() skips closing mds sessions.
556 """
557 if not isinstance(self.mount_a, FuseMount):
558 self.skipTest("Testing libcephfs function")
559
560 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
561 session_timeout = self.fs.get_var("session_timeout")
562
563 self.mount_a.umount_wait()
564 self.mount_b.umount_wait()
565
566 gid_str = self.mount_a.run_python(dedent("""
567 import cephfs as libcephfs
568 cephfs = libcephfs.LibCephFS(conffile='')
569 cephfs.mount()
570 client_id = cephfs.get_instance_id()
571 cephfs.abort_conn()
572 print(client_id)
573 """)
574 )
575 gid = int(gid_str);
576
577 self.assert_session_state(gid, "open")
578 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
579 self.assert_session_state(gid, "stale")
580
581 def test_dont_mark_unresponsive_client_stale(self):
582 """
583 Test that an unresponsive client holding caps is not marked stale or
584 evicted unless another clients wants its caps.
585 """
586 if not isinstance(self.mount_a, FuseMount):
587 self.skipTest("Require FUSE client to handle signal STOP/CONT")
588
589 # XXX: To conduct this test we need at least two clients since a
590 # single client is never evcited by MDS.
591 SESSION_TIMEOUT = 30
592 SESSION_AUTOCLOSE = 50
593 time_at_beg = time.time()
594 mount_a_gid = self.mount_a.get_global_id()
595 _ = self.mount_a.client_pid
596 self.fs.set_var('session_timeout', SESSION_TIMEOUT)
597 self.fs.set_var('session_autoclose', SESSION_AUTOCLOSE)
598 self.assert_session_count(2, self.fs.mds_asok(['session', 'ls']))
599
600 # test that client holding cap not required by any other client is not
601 # marked stale when it becomes unresponsive.
602 self.mount_a.run_shell(['mkdir', 'dir'])
603 self.mount_a.send_signal('sigstop')
604 time.sleep(SESSION_TIMEOUT + 2)
605 self.assert_session_state(mount_a_gid, "open")
606
607 # test that other clients have to wait to get the caps from
608 # unresponsive client until session_autoclose.
609 self.mount_b.run_shell(['stat', 'dir'])
610 self.assert_session_count(1, self.fs.mds_asok(['session', 'ls']))
611 self.assertLess(time.time(), time_at_beg + SESSION_AUTOCLOSE)
612
613 self.mount_a.send_signal('sigcont')
614
615 def test_config_session_timeout(self):
616 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
617 session_timeout = self.fs.get_var("session_timeout")
618 mount_a_gid = self.mount_a.get_global_id()
619
620 self.fs.mds_asok(['session', 'config', '%s' % mount_a_gid, 'timeout', '%s' % (session_timeout * 2)])
621
622 self.mount_a.kill();
623
624 self.assert_session_count(2)
625
626 time.sleep(session_timeout * 1.5)
627 self.assert_session_state(mount_a_gid, "open")
628
629 time.sleep(session_timeout)
630 self.assert_session_count(1)
631
632 self.mount_a.kill_cleanup()
633
634 def test_reconnect_after_blacklisted(self):
635 """
636 Test reconnect after blacklisted.
637 - writing to a fd that was opened before blacklist should return -EBADF
638 - reading/writing to a file with lost file locks should return -EIO
639 - readonly fd should continue to work
640 """
641
642 self.mount_a.umount_wait()
643
644 if isinstance(self.mount_a, FuseMount):
645 self.skipTest("Not implemented in FUSE client yet")
646 else:
647 try:
648 self.mount_a.mount(mount_options=['recover_session=clean'])
649 except CommandFailedError:
650 self.mount_a.kill_cleanup()
651 self.skipTest("Not implemented in current kernel")
652
653 self.mount_a.wait_until_mounted()
654
655 path = os.path.join(self.mount_a.mountpoint, 'testfile_reconnect_after_blacklisted')
656 pyscript = dedent("""
657 import os
658 import sys
659 import fcntl
660 import errno
661 import time
662
663 fd1 = os.open("{path}.1", os.O_RDWR | os.O_CREAT, 0O666)
664 fd2 = os.open("{path}.1", os.O_RDONLY)
665 fd3 = os.open("{path}.2", os.O_RDWR | os.O_CREAT, 0O666)
666 fd4 = os.open("{path}.2", os.O_RDONLY)
667
668 os.write(fd1, b'content')
669 os.read(fd2, 1);
670
671 os.write(fd3, b'content')
672 os.read(fd4, 1);
673 fcntl.flock(fd4, fcntl.LOCK_SH | fcntl.LOCK_NB)
674
675 print("blacklist")
676 sys.stdout.flush()
677
678 sys.stdin.readline()
679
680 # wait for mds to close session
681 time.sleep(10);
682
683 # trigger 'open session' message. kclient relies on 'session reject' message
684 # to detect if itself is blacklisted
685 try:
686 os.stat("{path}.1")
687 except:
688 pass
689
690 # wait for auto reconnect
691 time.sleep(10);
692
693 try:
694 os.write(fd1, b'content')
695 except OSError as e:
696 if e.errno != errno.EBADF:
697 raise
698 else:
699 raise RuntimeError("write() failed to raise error")
700
701 os.read(fd2, 1);
702
703 try:
704 os.read(fd4, 1)
705 except OSError as e:
706 if e.errno != errno.EIO:
707 raise
708 else:
709 raise RuntimeError("read() failed to raise error")
710 """).format(path=path)
711 rproc = self.mount_a.client_remote.run(
712 args=['sudo', 'python3', '-c', pyscript],
713 wait=False, stdin=run.PIPE, stdout=run.PIPE)
714
715 rproc.stdout.readline()
716
717 mount_a_client_id = self.mount_a.get_global_id()
718 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
719
720 rproc.stdin.writelines(['done\n'])
721 rproc.stdin.flush()
722
723 rproc.wait()
724 self.assertEqual(rproc.exitstatus, 0)