]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_client_recovery.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / cephfs / test_client_recovery.py
1
2 """
3 Teuthology task for exercising CephFS client recovery
4 """
5
6 import logging
7 from textwrap import dedent
8 import time
9 import distutils.version as version
10 import random
11 import re
12 import string
13 import os
14
15 from teuthology.orchestra import run
16 from teuthology.exceptions import CommandFailedError
17 from tasks.cephfs.fuse_mount import FuseMount
18 from tasks.cephfs.cephfs_test_case import CephFSTestCase
19 from teuthology.packaging import get_package_version
20
21 log = logging.getLogger(__name__)
22
23
24 # Arbitrary timeouts for operations involving restarting
25 # an MDS or waiting for it to come up
26 MDS_RESTART_GRACE = 60
27
28
29 class TestClientNetworkRecovery(CephFSTestCase):
30 REQUIRE_ONE_CLIENT_REMOTE = True
31 CLIENTS_REQUIRED = 2
32
33 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
34
35 # Environment references
36 mds_reconnect_timeout = None
37 ms_max_backoff = None
38
39 def test_network_death(self):
40 """
41 Simulate software freeze or temporary network failure.
42
43 Check that the client blocks I/O during failure, and completes
44 I/O after failure.
45 """
46
47 session_timeout = self.fs.get_var("session_timeout")
48 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
49
50 # We only need one client
51 self.mount_b.umount_wait()
52
53 # Initially our one client session should be visible
54 client_id = self.mount_a.get_global_id()
55 ls_data = self._session_list()
56 self.assert_session_count(1, ls_data)
57 self.assertEqual(ls_data[0]['id'], client_id)
58 self.assert_session_state(client_id, "open")
59
60 # ...and capable of doing I/O without blocking
61 self.mount_a.create_files()
62
63 # ...but if we turn off the network
64 self.fs.set_clients_block(True)
65
66 # ...and try and start an I/O
67 write_blocked = self.mount_a.write_background()
68
69 # ...then it should block
70 self.assertFalse(write_blocked.finished)
71 self.assert_session_state(client_id, "open")
72 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
73 self.assertFalse(write_blocked.finished)
74 self.assert_session_state(client_id, "stale")
75
76 # ...until we re-enable I/O
77 self.fs.set_clients_block(False)
78
79 # ...when it should complete promptly
80 a = time.time()
81 self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2)
82 write_blocked.wait() # Already know we're finished, wait() to raise exception on errors
83 recovery_time = time.time() - a
84 log.info("recovery time: {0}".format(recovery_time))
85 self.assert_session_state(client_id, "open")
86
87
88 class TestClientRecovery(CephFSTestCase):
89 CLIENTS_REQUIRED = 2
90
91 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
92
93 # Environment references
94 mds_reconnect_timeout = None
95 ms_max_backoff = None
96
97 def test_basic(self):
98 # Check that two clients come up healthy and see each others' files
99 # =====================================================
100 self.mount_a.create_files()
101 self.mount_a.check_files()
102 self.mount_a.umount_wait()
103
104 self.mount_b.check_files()
105
106 self.mount_a.mount_wait()
107
108 # Check that the admin socket interface is correctly reporting
109 # two sessions
110 # =====================================================
111 ls_data = self._session_list()
112 self.assert_session_count(2, ls_data)
113
114 self.assertSetEqual(
115 set([l['id'] for l in ls_data]),
116 {self.mount_a.get_global_id(), self.mount_b.get_global_id()}
117 )
118
119 def test_restart(self):
120 # Check that after an MDS restart both clients reconnect and continue
121 # to handle I/O
122 # =====================================================
123 self.fs.mds_fail_restart()
124 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
125
126 self.mount_a.create_destroy()
127 self.mount_b.create_destroy()
128
129 def _session_num_caps(self, client_id):
130 ls_data = self.fs.mds_asok(['session', 'ls'])
131 return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps'])
132
133 def test_reconnect_timeout(self):
134 # Reconnect timeout
135 # =================
136 # Check that if I stop an MDS and a client goes away, the MDS waits
137 # for the reconnect period
138
139 mount_a_client_id = self.mount_a.get_global_id()
140
141 self.fs.fail()
142
143 self.mount_a.umount_wait(force=True)
144
145 self.fs.set_joinable()
146
147 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
148 # Check that the MDS locally reports its state correctly
149 status = self.fs.mds_asok(['status'])
150 self.assertIn("reconnect_status", status)
151
152 ls_data = self._session_list()
153 self.assert_session_count(2, ls_data)
154
155 # The session for the dead client should have the 'reconnect' flag set
156 self.assertTrue(self.get_session(mount_a_client_id)['reconnecting'])
157
158 # Wait for the reconnect state to clear, this should take the
159 # reconnect timeout period.
160 in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2)
161 # Check that the period we waited to enter active is within a factor
162 # of two of the reconnect timeout.
163 self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout // 2,
164 "Should have been in reconnect phase for {0} but only took {1}".format(
165 self.mds_reconnect_timeout, in_reconnect_for
166 ))
167
168 self.assert_session_count(1)
169
170 # Check that the client that timed out during reconnect can
171 # mount again and do I/O
172 self.mount_a.mount_wait()
173 self.mount_a.create_destroy()
174
175 self.assert_session_count(2)
176
177 def test_reconnect_eviction(self):
178 # Eviction during reconnect
179 # =========================
180 mount_a_client_id = self.mount_a.get_global_id()
181
182 self.fs.fail()
183
184 # The mount goes away while the MDS is offline
185 self.mount_a.kill()
186
187 # wait for it to die
188 time.sleep(5)
189
190 self.fs.set_joinable()
191
192 # Enter reconnect phase
193 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
194 self.assert_session_count(2)
195
196 # Evict the stuck client
197 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
198 self.assert_session_count(1)
199
200 # Observe that we proceed to active phase without waiting full reconnect timeout
201 evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
202 # Once we evict the troublemaker, the reconnect phase should complete
203 # in well under the reconnect timeout.
204 self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5,
205 "reconnect did not complete soon enough after eviction, took {0}".format(
206 evict_til_active
207 ))
208
209 # We killed earlier so must clean up before trying to use again
210 self.mount_a.kill_cleanup()
211
212 # Bring the client back
213 self.mount_a.mount_wait()
214 self.mount_a.create_destroy()
215
216 def _test_stale_caps(self, write):
217 session_timeout = self.fs.get_var("session_timeout")
218
219 # Capability release from stale session
220 # =====================================
221 if write:
222 content = ''.join(random.choices(string.ascii_uppercase + string.digits, k=16))
223 cap_holder = self.mount_a.open_background(content=content)
224 else:
225 content = ''
226 self.mount_a.run_shell(["touch", "background_file"])
227 self.mount_a.umount_wait()
228 self.mount_a.mount_wait()
229 cap_holder = self.mount_a.open_background(write=False)
230
231 self.assert_session_count(2)
232 mount_a_gid = self.mount_a.get_global_id()
233
234 # Wait for the file to be visible from another client, indicating
235 # that mount_a has completed its network ops
236 self.mount_b.wait_for_visible(size=len(content))
237
238 # Simulate client death
239 self.mount_a.suspend_netns()
240
241 # wait for it to die so it doesn't voluntarily release buffer cap
242 time.sleep(5)
243
244 try:
245 # Now, after session_timeout seconds, the waiter should
246 # complete their operation when the MDS marks the holder's
247 # session stale.
248 cap_waiter = self.mount_b.write_background()
249 a = time.time()
250 cap_waiter.wait()
251 b = time.time()
252
253 # Should have succeeded
254 self.assertEqual(cap_waiter.exitstatus, 0)
255
256 if write:
257 self.assert_session_count(1)
258 else:
259 self.assert_session_state(mount_a_gid, "stale")
260
261 cap_waited = b - a
262 log.info("cap_waiter waited {0}s".format(cap_waited))
263 self.assertTrue(session_timeout / 2.0 <= cap_waited <= session_timeout * 2.0,
264 "Capability handover took {0}, expected approx {1}".format(
265 cap_waited, session_timeout
266 ))
267 finally:
268 self.mount_a.resume_netns() # allow the mount to recover otherwise background proc is unkillable
269 self.mount_a._kill_background(cap_holder)
270
271 def test_stale_read_caps(self):
272 self._test_stale_caps(False)
273
274 def test_stale_write_caps(self):
275 self._test_stale_caps(True)
276
277 def test_evicted_caps(self):
278 # Eviction while holding a capability
279 # ===================================
280
281 session_timeout = self.fs.get_var("session_timeout")
282
283 # Take out a write capability on a file on client A,
284 # and then immediately kill it.
285 cap_holder = self.mount_a.open_background()
286 mount_a_client_id = self.mount_a.get_global_id()
287
288 # Wait for the file to be visible from another client, indicating
289 # that mount_a has completed its network ops
290 self.mount_b.wait_for_visible()
291
292 # Simulate client death
293 self.mount_a.suspend_netns()
294
295 # wait for it to die so it doesn't voluntarily release buffer cap
296 time.sleep(5)
297
298 try:
299 # The waiter should get stuck waiting for the capability
300 # held on the MDS by the now-dead client A
301 cap_waiter = self.mount_b.write_background()
302 time.sleep(5)
303 self.assertFalse(cap_waiter.finished)
304
305 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
306 # Now, because I evicted the old holder of the capability, it should
307 # immediately get handed over to the waiter
308 a = time.time()
309 cap_waiter.wait()
310 b = time.time()
311 cap_waited = b - a
312 log.info("cap_waiter waited {0}s".format(cap_waited))
313 # This is the check that it happened 'now' rather than waiting
314 # for the session timeout
315 self.assertLess(cap_waited, session_timeout / 2.0,
316 "Capability handover took {0}, expected less than {1}".format(
317 cap_waited, session_timeout / 2.0
318 ))
319
320 finally:
321 self.mount_a.resume_netns() # allow the mount to recover otherwise background proc is unkillable
322 self.mount_a._kill_background(cap_holder)
323
324 def test_trim_caps(self):
325 # Trim capability when reconnecting MDS
326 # ===================================
327
328 count = 500
329 # Create lots of files
330 for i in range(count):
331 self.mount_a.run_shell(["touch", "f{0}".format(i)])
332
333 # Populate mount_b's cache
334 self.mount_b.run_shell(["ls", "-l"])
335
336 client_id = self.mount_b.get_global_id()
337 num_caps = self._session_num_caps(client_id)
338 self.assertGreaterEqual(num_caps, count)
339
340 # Restart MDS. client should trim its cache when reconnecting to the MDS
341 self.fs.mds_fail_restart()
342 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
343
344 num_caps = self._session_num_caps(client_id)
345 self.assertLess(num_caps, count,
346 "should have less than {0} capabilities, have {1}".format(
347 count, num_caps
348 ))
349
350 def _is_flockable(self):
351 a_version_str = get_package_version(self.mount_a.client_remote, "fuse")
352 b_version_str = get_package_version(self.mount_b.client_remote, "fuse")
353 flock_version_str = "2.9"
354
355 version_regex = re.compile(r"[0-9\.]+")
356 a_result = version_regex.match(a_version_str)
357 self.assertTrue(a_result)
358 b_result = version_regex.match(b_version_str)
359 self.assertTrue(b_result)
360 a_version = version.StrictVersion(a_result.group())
361 b_version = version.StrictVersion(b_result.group())
362 flock_version=version.StrictVersion(flock_version_str)
363
364 if (a_version >= flock_version and b_version >= flock_version):
365 log.info("flock locks are available")
366 return True
367 else:
368 log.info("not testing flock locks, machines have versions {av} and {bv}".format(
369 av=a_version_str,bv=b_version_str))
370 return False
371
372 def test_filelock(self):
373 """
374 Check that file lock doesn't get lost after an MDS restart
375 """
376
377 flockable = self._is_flockable()
378 lock_holder = self.mount_a.lock_background(do_flock=flockable)
379
380 self.mount_b.wait_for_visible("background_file-2")
381 self.mount_b.check_filelock(do_flock=flockable)
382
383 self.fs.mds_fail_restart()
384 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
385
386 self.mount_b.check_filelock(do_flock=flockable)
387
388 self.mount_a._kill_background(lock_holder)
389
390 def test_filelock_eviction(self):
391 """
392 Check that file lock held by evicted client is given to
393 waiting client.
394 """
395 if not self._is_flockable():
396 self.skipTest("flock is not available")
397
398 lock_holder = self.mount_a.lock_background()
399 self.mount_b.wait_for_visible("background_file-2")
400 self.mount_b.check_filelock()
401
402 lock_taker = self.mount_b.lock_and_release()
403 # Check the taker is waiting (doesn't get it immediately)
404 time.sleep(2)
405 self.assertFalse(lock_holder.finished)
406 self.assertFalse(lock_taker.finished)
407
408 try:
409 mount_a_client_id = self.mount_a.get_global_id()
410 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
411
412 # Evicting mount_a should let mount_b's attempt to take the lock
413 # succeed
414 self.wait_until_true(lambda: lock_taker.finished, timeout=10)
415 finally:
416 self.mount_a._kill_background(lock_holder)
417
418 # teardown() doesn't quite handle this case cleanly, so help it out
419 self.mount_a.kill()
420 self.mount_a.kill_cleanup()
421
422 # Bring the client back
423 self.mount_a.mount_wait()
424
425 def test_dir_fsync(self):
426 self._test_fsync(True);
427
428 def test_create_fsync(self):
429 self._test_fsync(False);
430
431 def _test_fsync(self, dirfsync):
432 """
433 That calls to fsync guarantee visibility of metadata to another
434 client immediately after the fsyncing client dies.
435 """
436
437 # Leave this guy out until he's needed
438 self.mount_b.umount_wait()
439
440 # Create dir + child dentry on client A, and fsync the dir
441 path = os.path.join(self.mount_a.mountpoint, "subdir")
442 self.mount_a.run_python(
443 dedent("""
444 import os
445 import time
446
447 path = "{path}"
448
449 print("Starting creation...")
450 start = time.time()
451
452 os.mkdir(path)
453 dfd = os.open(path, os.O_DIRECTORY)
454
455 fd = open(os.path.join(path, "childfile"), "w")
456 print("Finished creation in {{0}}s".format(time.time() - start))
457
458 print("Starting fsync...")
459 start = time.time()
460 if {dirfsync}:
461 os.fsync(dfd)
462 else:
463 os.fsync(fd)
464 print("Finished fsync in {{0}}s".format(time.time() - start))
465 """.format(path=path,dirfsync=str(dirfsync)))
466 )
467
468 # Immediately kill the MDS and then client A
469 self.fs.fail()
470 self.mount_a.kill()
471 self.mount_a.kill_cleanup()
472
473 # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay
474 self.fs.set_joinable()
475 log.info("Waiting for reconnect...")
476 self.fs.wait_for_state("up:reconnect")
477 log.info("Waiting for active...")
478 self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout)
479 log.info("Reached active...")
480
481 # Is the child dentry visible from mount B?
482 self.mount_b.mount_wait()
483 self.mount_b.run_shell(["ls", "subdir/childfile"])
484
485 def test_unmount_for_evicted_client(self):
486 """Test if client hangs on unmount after evicting the client."""
487 mount_a_client_id = self.mount_a.get_global_id()
488 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
489
490 self.mount_a.umount_wait(require_clean=True, timeout=30)
491
492 def test_mount_after_evicted_client(self):
493 """Test if a new mount of same fs works after client eviction."""
494
495 # trash this : we need it to use same remote as mount_a
496 self.mount_b.umount_wait()
497
498 cl = self.mount_a.__class__
499
500 # create a new instance of mount_a's class with most of the
501 # same settings, but mounted on mount_b's mountpoint.
502 m = cl(ctx=self.mount_a.ctx,
503 client_config=self.mount_a.client_config,
504 test_dir=self.mount_a.test_dir,
505 client_id=self.mount_a.client_id,
506 client_remote=self.mount_a.client_remote,
507 client_keyring_path=self.mount_a.client_keyring_path,
508 cephfs_name=self.mount_a.cephfs_name,
509 cephfs_mntpt= self.mount_a.cephfs_mntpt,
510 hostfs_mntpt=self.mount_b.hostfs_mntpt,
511 brxnet=self.mount_a.ceph_brx_net)
512
513 # evict mount_a
514 mount_a_client_id = self.mount_a.get_global_id()
515 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
516
517 m.mount_wait()
518 m.create_files()
519 m.check_files()
520 m.umount_wait(require_clean=True)
521
522 def test_stale_renew(self):
523 if not isinstance(self.mount_a, FuseMount):
524 self.skipTest("Require FUSE client to handle signal STOP/CONT")
525
526 session_timeout = self.fs.get_var("session_timeout")
527
528 self.mount_a.run_shell(["mkdir", "testdir"])
529 self.mount_a.run_shell(["touch", "testdir/file1"])
530 # populate readdir cache
531 self.mount_a.run_shell(["ls", "testdir"])
532 self.mount_b.run_shell(["ls", "testdir"])
533
534 # check if readdir cache is effective
535 initial_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
536 self.mount_b.run_shell(["ls", "testdir"])
537 current_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
538 self.assertEqual(current_readdirs, initial_readdirs);
539
540 mount_b_gid = self.mount_b.get_global_id()
541 # stop ceph-fuse process of mount_b
542 self.mount_b.suspend_netns()
543
544 self.assert_session_state(mount_b_gid, "open")
545 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
546
547 self.mount_a.run_shell(["touch", "testdir/file2"])
548 self.assert_session_state(mount_b_gid, "stale")
549
550 # resume ceph-fuse process of mount_b
551 self.mount_b.resume_netns()
552 # Is the new file visible from mount_b? (caps become invalid after session stale)
553 self.mount_b.run_shell(["ls", "testdir/file2"])
554
555 def test_abort_conn(self):
556 """
557 Check that abort_conn() skips closing mds sessions.
558 """
559 if not isinstance(self.mount_a, FuseMount):
560 self.skipTest("Testing libcephfs function")
561
562 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
563 session_timeout = self.fs.get_var("session_timeout")
564
565 self.mount_a.umount_wait()
566 self.mount_b.umount_wait()
567
568 gid_str = self.mount_a.run_python(dedent("""
569 import cephfs as libcephfs
570 cephfs = libcephfs.LibCephFS(conffile='')
571 cephfs.mount()
572 client_id = cephfs.get_instance_id()
573 cephfs.abort_conn()
574 print(client_id)
575 """)
576 )
577 gid = int(gid_str);
578
579 self.assert_session_state(gid, "open")
580 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
581 self.assert_session_state(gid, "stale")
582
583 def test_dont_mark_unresponsive_client_stale(self):
584 """
585 Test that an unresponsive client holding caps is not marked stale or
586 evicted unless another clients wants its caps.
587 """
588 if not isinstance(self.mount_a, FuseMount):
589 self.skipTest("Require FUSE client to handle signal STOP/CONT")
590
591 # XXX: To conduct this test we need at least two clients since a
592 # single client is never evcited by MDS.
593 SESSION_TIMEOUT = 30
594 SESSION_AUTOCLOSE = 50
595 time_at_beg = time.time()
596 mount_a_gid = self.mount_a.get_global_id()
597 _ = self.mount_a.client_pid
598 self.fs.set_var('session_timeout', SESSION_TIMEOUT)
599 self.fs.set_var('session_autoclose', SESSION_AUTOCLOSE)
600 self.assert_session_count(2, self.fs.mds_asok(['session', 'ls']))
601
602 # test that client holding cap not required by any other client is not
603 # marked stale when it becomes unresponsive.
604 self.mount_a.run_shell(['mkdir', 'dir'])
605 self.mount_a.send_signal('sigstop')
606 time.sleep(SESSION_TIMEOUT + 2)
607 self.assert_session_state(mount_a_gid, "open")
608
609 # test that other clients have to wait to get the caps from
610 # unresponsive client until session_autoclose.
611 self.mount_b.run_shell(['stat', 'dir'])
612 self.assert_session_count(1, self.fs.mds_asok(['session', 'ls']))
613 self.assertLess(time.time(), time_at_beg + SESSION_AUTOCLOSE)
614
615 self.mount_a.send_signal('sigcont')
616
617 def test_config_session_timeout(self):
618 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
619 session_timeout = self.fs.get_var("session_timeout")
620 mount_a_gid = self.mount_a.get_global_id()
621
622 self.fs.mds_asok(['session', 'config', '%s' % mount_a_gid, 'timeout', '%s' % (session_timeout * 2)])
623
624 self.mount_a.kill();
625
626 self.assert_session_count(2)
627
628 time.sleep(session_timeout * 1.5)
629 self.assert_session_state(mount_a_gid, "open")
630
631 time.sleep(session_timeout)
632 self.assert_session_count(1)
633
634 self.mount_a.kill_cleanup()
635
636 def test_reconnect_after_blocklisted(self):
637 """
638 Test reconnect after blocklisted.
639 - writing to a fd that was opened before blocklist should return -EBADF
640 - reading/writing to a file with lost file locks should return -EIO
641 - readonly fd should continue to work
642 """
643
644 self.mount_a.umount_wait()
645
646 if isinstance(self.mount_a, FuseMount):
647 self.mount_a.mount_wait(mntargs=['--client_reconnect_stale=1', '--fuse_disable_pagecache=1'])
648 else:
649 try:
650 self.mount_a.mount_wait(mntopts=['recover_session=clean'])
651 except CommandFailedError:
652 self.mount_a.kill_cleanup()
653 self.skipTest("Not implemented in current kernel")
654
655 self.mount_a.wait_until_mounted()
656
657 path = os.path.join(self.mount_a.mountpoint, 'testfile_reconnect_after_blocklisted')
658 pyscript = dedent("""
659 import os
660 import sys
661 import fcntl
662 import errno
663 import time
664
665 fd1 = os.open("{path}.1", os.O_RDWR | os.O_CREAT, 0O666)
666 fd2 = os.open("{path}.1", os.O_RDONLY)
667 fd3 = os.open("{path}.2", os.O_RDWR | os.O_CREAT, 0O666)
668 fd4 = os.open("{path}.2", os.O_RDONLY)
669
670 os.write(fd1, b'content')
671 os.read(fd2, 1);
672
673 os.write(fd3, b'content')
674 os.read(fd4, 1);
675 fcntl.flock(fd4, fcntl.LOCK_SH | fcntl.LOCK_NB)
676
677 print("blocklist")
678 sys.stdout.flush()
679
680 sys.stdin.readline()
681
682 # wait for mds to close session
683 time.sleep(10);
684
685 # trigger 'open session' message. kclient relies on 'session reject' message
686 # to detect if itself is blocklisted
687 try:
688 os.stat("{path}.1")
689 except:
690 pass
691
692 # wait for auto reconnect
693 time.sleep(10);
694
695 try:
696 os.write(fd1, b'content')
697 except OSError as e:
698 if e.errno != errno.EBADF:
699 raise
700 else:
701 raise RuntimeError("write() failed to raise error")
702
703 os.read(fd2, 1);
704
705 try:
706 os.read(fd4, 1)
707 except OSError as e:
708 if e.errno != errno.EIO:
709 raise
710 else:
711 raise RuntimeError("read() failed to raise error")
712 """).format(path=path)
713 rproc = self.mount_a.client_remote.run(
714 args=['python3', '-c', pyscript],
715 wait=False, stdin=run.PIPE, stdout=run.PIPE)
716
717 rproc.stdout.readline()
718
719 mount_a_client_id = self.mount_a.get_global_id()
720 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
721
722 rproc.stdin.writelines(['done\n'])
723 rproc.stdin.flush()
724
725 rproc.wait()
726 self.assertEqual(rproc.exitstatus, 0)
727
728 def test_refuse_client_session(self):
729 """
730 Test that client cannot start session when file system flag
731 refuse_client_session is set
732 """
733
734 self.mount_a.umount_wait()
735 self.fs.set_refuse_client_session(True)
736 with self.assertRaises(CommandFailedError):
737 self.mount_a.mount_wait()
738
739 def test_refuse_client_session_on_reconnect(self):
740 """
741 Test that client cannot reconnect when filesystem comes online and
742 file system flag refuse_client_session is set
743 """
744
745 self.mount_a.create_files()
746 self.mount_a.check_files()
747
748 self.fs.fail()
749 self.fs.set_refuse_client_session(True)
750 self.fs.set_joinable()
751 with self.assert_cluster_log('client could not reconnect as'
752 ' file system flag'
753 ' refuse_client_session is set'):
754 time.sleep(self.fs.get_var("session_timeout") * 1.5)
755 self.assertEqual(len(self.fs.mds_tell(["session", "ls"])), 0)
756 self.mount_a.umount_wait(force=True)
757