3 Teuthology task for exercising CephFS client recovery
7 from textwrap
import dedent
9 import distutils
.version
as version
13 from teuthology
.orchestra
import run
14 from teuthology
.orchestra
.run
import CommandFailedError
, ConnectionLostError
15 from tasks
.cephfs
.fuse_mount
import FuseMount
16 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
17 from teuthology
.packaging
import get_package_version
19 log
= logging
.getLogger(__name__
)
22 # Arbitrary timeouts for operations involving restarting
23 # an MDS or waiting for it to come up
24 MDS_RESTART_GRACE
= 60
27 class TestClientNetworkRecovery(CephFSTestCase
):
28 REQUIRE_KCLIENT_REMOTE
= True
29 REQUIRE_ONE_CLIENT_REMOTE
= True
32 LOAD_SETTINGS
= ["mds_reconnect_timeout", "ms_max_backoff"]
34 # Environment references
35 mds_reconnect_timeout
= None
38 def test_network_death(self
):
40 Simulate software freeze or temporary network failure.
42 Check that the client blocks I/O during failure, and completes
46 session_timeout
= self
.fs
.get_var("session_timeout")
47 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
49 # We only need one client
50 self
.mount_b
.umount_wait()
52 # Initially our one client session should be visible
53 client_id
= self
.mount_a
.get_global_id()
54 ls_data
= self
._session
_list
()
55 self
.assert_session_count(1, ls_data
)
56 self
.assertEqual(ls_data
[0]['id'], client_id
)
57 self
.assert_session_state(client_id
, "open")
59 # ...and capable of doing I/O without blocking
60 self
.mount_a
.create_files()
62 # ...but if we turn off the network
63 self
.fs
.set_clients_block(True)
65 # ...and try and start an I/O
66 write_blocked
= self
.mount_a
.write_background()
68 # ...then it should block
69 self
.assertFalse(write_blocked
.finished
)
70 self
.assert_session_state(client_id
, "open")
71 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
72 self
.assertFalse(write_blocked
.finished
)
73 self
.assert_session_state(client_id
, "stale")
75 # ...until we re-enable I/O
76 self
.fs
.set_clients_block(False)
78 # ...when it should complete promptly
80 self
.wait_until_true(lambda: write_blocked
.finished
, self
.ms_max_backoff
* 2)
81 write_blocked
.wait() # Already know we're finished, wait() to raise exception on errors
82 recovery_time
= time
.time() - a
83 log
.info("recovery time: {0}".format(recovery_time
))
84 self
.assert_session_state(client_id
, "open")
87 class TestClientRecovery(CephFSTestCase
):
88 REQUIRE_KCLIENT_REMOTE
= True
91 LOAD_SETTINGS
= ["mds_reconnect_timeout", "ms_max_backoff"]
93 # Environment references
94 mds_reconnect_timeout
= None
98 # Check that two clients come up healthy and see each others' files
99 # =====================================================
100 self
.mount_a
.create_files()
101 self
.mount_a
.check_files()
102 self
.mount_a
.umount_wait()
104 self
.mount_b
.check_files()
107 self
.mount_a
.wait_until_mounted()
109 # Check that the admin socket interface is correctly reporting
111 # =====================================================
112 ls_data
= self
._session
_list
()
113 self
.assert_session_count(2, ls_data
)
116 set([l
['id'] for l
in ls_data
]),
117 {self
.mount_a
.get_global_id(), self
.mount_b
.get_global_id()}
120 def test_restart(self
):
121 # Check that after an MDS restart both clients reconnect and continue
123 # =====================================================
124 self
.fs
.mds_fail_restart()
125 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
127 self
.mount_a
.create_destroy()
128 self
.mount_b
.create_destroy()
130 def _session_num_caps(self
, client_id
):
131 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
132 return int(self
._session
_by
_id
(ls_data
).get(client_id
, {'num_caps': None})['num_caps'])
134 def test_reconnect_timeout(self
):
137 # Check that if I stop an MDS and a client goes away, the MDS waits
138 # for the reconnect period
142 mount_a_client_id
= self
.mount_a
.get_global_id()
143 self
.mount_a
.umount_wait(force
=True)
145 self
.fs
.mds_restart()
147 self
.fs
.wait_for_state('up:reconnect', reject
='up:active', timeout
=MDS_RESTART_GRACE
)
148 # Check that the MDS locally reports its state correctly
149 status
= self
.fs
.mds_asok(['status'])
150 self
.assertIn("reconnect_status", status
)
152 ls_data
= self
._session
_list
()
153 self
.assert_session_count(2, ls_data
)
155 # The session for the dead client should have the 'reconnect' flag set
156 self
.assertTrue(self
.get_session(mount_a_client_id
)['reconnecting'])
158 # Wait for the reconnect state to clear, this should take the
159 # reconnect timeout period.
160 in_reconnect_for
= self
.fs
.wait_for_state('up:active', timeout
=self
.mds_reconnect_timeout
* 2)
161 # Check that the period we waited to enter active is within a factor
162 # of two of the reconnect timeout.
163 self
.assertGreater(in_reconnect_for
, self
.mds_reconnect_timeout
/ 2,
164 "Should have been in reconnect phase for {0} but only took {1}".format(
165 self
.mds_reconnect_timeout
, in_reconnect_for
168 self
.assert_session_count(1)
170 # Check that the client that timed out during reconnect can
171 # mount again and do I/O
173 self
.mount_a
.wait_until_mounted()
174 self
.mount_a
.create_destroy()
176 self
.assert_session_count(2)
178 def test_reconnect_eviction(self
):
179 # Eviction during reconnect
180 # =========================
181 mount_a_client_id
= self
.mount_a
.get_global_id()
186 # The mount goes away while the MDS is offline
192 self
.fs
.mds_restart()
194 # Enter reconnect phase
195 self
.fs
.wait_for_state('up:reconnect', reject
='up:active', timeout
=MDS_RESTART_GRACE
)
196 self
.assert_session_count(2)
198 # Evict the stuck client
199 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
200 self
.assert_session_count(1)
202 # Observe that we proceed to active phase without waiting full reconnect timeout
203 evict_til_active
= self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
204 # Once we evict the troublemaker, the reconnect phase should complete
205 # in well under the reconnect timeout.
206 self
.assertLess(evict_til_active
, self
.mds_reconnect_timeout
* 0.5,
207 "reconnect did not complete soon enough after eviction, took {0}".format(
211 # We killed earlier so must clean up before trying to use again
212 self
.mount_a
.kill_cleanup()
214 # Bring the client back
216 self
.mount_a
.wait_until_mounted()
217 self
.mount_a
.create_destroy()
219 def _test_stale_caps(self
, write
):
220 session_timeout
= self
.fs
.get_var("session_timeout")
222 # Capability release from stale session
223 # =====================================
225 cap_holder
= self
.mount_a
.open_background()
227 self
.mount_a
.run_shell(["touch", "background_file"])
228 self
.mount_a
.umount_wait()
230 self
.mount_a
.wait_until_mounted()
231 cap_holder
= self
.mount_a
.open_background(write
=False)
233 self
.assert_session_count(2)
234 mount_a_gid
= self
.mount_a
.get_global_id()
236 # Wait for the file to be visible from another client, indicating
237 # that mount_a has completed its network ops
238 self
.mount_b
.wait_for_visible()
240 # Simulate client death
243 # wait for it to die so it doesn't voluntarily release buffer cap
247 # Now, after session_timeout seconds, the waiter should
248 # complete their operation when the MDS marks the holder's
250 cap_waiter
= self
.mount_b
.write_background()
255 # Should have succeeded
256 self
.assertEqual(cap_waiter
.exitstatus
, 0)
259 self
.assert_session_count(1)
261 self
.assert_session_state(mount_a_gid
, "stale")
264 log
.info("cap_waiter waited {0}s".format(cap_waited
))
265 self
.assertTrue(session_timeout
/ 2.0 <= cap_waited
<= session_timeout
* 2.0,
266 "Capability handover took {0}, expected approx {1}".format(
267 cap_waited
, session_timeout
270 cap_holder
.stdin
.close()
273 except (CommandFailedError
, ConnectionLostError
):
274 # We killed it (and possibly its node), so it raises an error
277 # teardown() doesn't quite handle this case cleanly, so help it out
278 self
.mount_a
.kill_cleanup()
281 self
.mount_a
.wait_until_mounted()
283 def test_stale_read_caps(self
):
284 self
._test
_stale
_caps
(False)
286 def test_stale_write_caps(self
):
287 self
._test
_stale
_caps
(True)
289 def test_evicted_caps(self
):
290 # Eviction while holding a capability
291 # ===================================
293 session_timeout
= self
.fs
.get_var("session_timeout")
295 # Take out a write capability on a file on client A,
296 # and then immediately kill it.
297 cap_holder
= self
.mount_a
.open_background()
298 mount_a_client_id
= self
.mount_a
.get_global_id()
300 # Wait for the file to be visible from another client, indicating
301 # that mount_a has completed its network ops
302 self
.mount_b
.wait_for_visible()
304 # Simulate client death
307 # wait for it to die so it doesn't voluntarily release buffer cap
311 # The waiter should get stuck waiting for the capability
312 # held on the MDS by the now-dead client A
313 cap_waiter
= self
.mount_b
.write_background()
315 self
.assertFalse(cap_waiter
.finished
)
317 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
318 # Now, because I evicted the old holder of the capability, it should
319 # immediately get handed over to the waiter
324 log
.info("cap_waiter waited {0}s".format(cap_waited
))
325 # This is the check that it happened 'now' rather than waiting
326 # for the session timeout
327 self
.assertLess(cap_waited
, session_timeout
/ 2.0,
328 "Capability handover took {0}, expected less than {1}".format(
329 cap_waited
, session_timeout
/ 2.0
332 cap_holder
.stdin
.close()
335 except (CommandFailedError
, ConnectionLostError
):
336 # We killed it (and possibly its node), so it raises an error
339 self
.mount_a
.kill_cleanup()
342 self
.mount_a
.wait_until_mounted()
344 def test_trim_caps(self
):
345 # Trim capability when reconnecting MDS
346 # ===================================
349 # Create lots of files
350 for i
in range(count
):
351 self
.mount_a
.run_shell(["touch", "f{0}".format(i
)])
353 # Populate mount_b's cache
354 self
.mount_b
.run_shell(["ls", "-l"])
356 client_id
= self
.mount_b
.get_global_id()
357 num_caps
= self
._session
_num
_caps
(client_id
)
358 self
.assertGreaterEqual(num_caps
, count
)
360 # Restart MDS. client should trim its cache when reconnecting to the MDS
361 self
.fs
.mds_fail_restart()
362 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
364 num_caps
= self
._session
_num
_caps
(client_id
)
365 self
.assertLess(num_caps
, count
,
366 "should have less than {0} capabilities, have {1}".format(
370 def _is_flockable(self
):
371 a_version_str
= get_package_version(self
.mount_a
.client_remote
, "fuse")
372 b_version_str
= get_package_version(self
.mount_b
.client_remote
, "fuse")
373 flock_version_str
= "2.9"
375 version_regex
= re
.compile(r
"[0-9\.]+")
376 a_result
= version_regex
.match(a_version_str
)
377 self
.assertTrue(a_result
)
378 b_result
= version_regex
.match(b_version_str
)
379 self
.assertTrue(b_result
)
380 a_version
= version
.StrictVersion(a_result
.group())
381 b_version
= version
.StrictVersion(b_result
.group())
382 flock_version
=version
.StrictVersion(flock_version_str
)
384 if (a_version
>= flock_version
and b_version
>= flock_version
):
385 log
.info("flock locks are available")
388 log
.info("not testing flock locks, machines have versions {av} and {bv}".format(
389 av
=a_version_str
,bv
=b_version_str
))
392 def test_filelock(self
):
394 Check that file lock doesn't get lost after an MDS restart
397 flockable
= self
._is
_flockable
()
398 lock_holder
= self
.mount_a
.lock_background(do_flock
=flockable
)
400 self
.mount_b
.wait_for_visible("background_file-2")
401 self
.mount_b
.check_filelock(do_flock
=flockable
)
403 self
.fs
.mds_fail_restart()
404 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
406 self
.mount_b
.check_filelock(do_flock
=flockable
)
408 # Tear down the background process
409 lock_holder
.stdin
.close()
412 except (CommandFailedError
, ConnectionLostError
):
413 # We killed it, so it raises an error
416 def test_filelock_eviction(self
):
418 Check that file lock held by evicted client is given to
421 if not self
._is
_flockable
():
422 self
.skipTest("flock is not available")
424 lock_holder
= self
.mount_a
.lock_background()
425 self
.mount_b
.wait_for_visible("background_file-2")
426 self
.mount_b
.check_filelock()
428 lock_taker
= self
.mount_b
.lock_and_release()
429 # Check the taker is waiting (doesn't get it immediately)
431 self
.assertFalse(lock_holder
.finished
)
432 self
.assertFalse(lock_taker
.finished
)
435 mount_a_client_id
= self
.mount_a
.get_global_id()
436 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
438 # Evicting mount_a should let mount_b's attempt to take the lock
440 self
.wait_until_true(lambda: lock_taker
.finished
, timeout
=10)
442 # teardown() doesn't quite handle this case cleanly, so help it out
444 self
.mount_a
.kill_cleanup()
446 # Bring the client back
448 self
.mount_a
.wait_until_mounted()
450 def test_dir_fsync(self
):
451 self
._test
_fsync
(True);
453 def test_create_fsync(self
):
454 self
._test
_fsync
(False);
456 def _test_fsync(self
, dirfsync
):
458 That calls to fsync guarantee visibility of metadata to another
459 client immediately after the fsyncing client dies.
462 # Leave this guy out until he's needed
463 self
.mount_b
.umount_wait()
465 # Create dir + child dentry on client A, and fsync the dir
466 path
= os
.path
.join(self
.mount_a
.mountpoint
, "subdir")
467 self
.mount_a
.run_python(
474 print("Starting creation...")
478 dfd = os.open(path, os.O_DIRECTORY)
480 fd = open(os.path.join(path, "childfile"), "w")
481 print("Finished creation in {{0}}s".format(time.time() - start))
483 print("Starting fsync...")
489 print("Finished fsync in {{0}}s".format(time.time() - start))
490 """.format(path
=path
,dirfsync
=str(dirfsync
)))
493 # Immediately kill the MDS and then client A
497 self
.mount_a
.kill_cleanup()
499 # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay
500 self
.fs
.mds_restart()
501 log
.info("Waiting for reconnect...")
502 self
.fs
.wait_for_state("up:reconnect")
503 log
.info("Waiting for active...")
504 self
.fs
.wait_for_state("up:active", timeout
=MDS_RESTART_GRACE
+ self
.mds_reconnect_timeout
)
505 log
.info("Reached active...")
507 # Is the child dentry visible from mount B?
509 self
.mount_b
.wait_until_mounted()
510 self
.mount_b
.run_shell(["ls", "subdir/childfile"])
512 def test_unmount_for_evicted_client(self
):
513 """Test if client hangs on unmount after evicting the client."""
514 mount_a_client_id
= self
.mount_a
.get_global_id()
515 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
517 self
.mount_a
.umount_wait(require_clean
=True, timeout
=30)
519 def test_stale_renew(self
):
520 if not isinstance(self
.mount_a
, FuseMount
):
521 self
.skipTest("Require FUSE client to handle signal STOP/CONT")
523 session_timeout
= self
.fs
.get_var("session_timeout")
525 self
.mount_a
.run_shell(["mkdir", "testdir"])
526 self
.mount_a
.run_shell(["touch", "testdir/file1"])
527 # populate readdir cache
528 self
.mount_a
.run_shell(["ls", "testdir"])
529 self
.mount_b
.run_shell(["ls", "testdir"])
531 # check if readdir cache is effective
532 initial_readdirs
= self
.fs
.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
533 self
.mount_b
.run_shell(["ls", "testdir"])
534 current_readdirs
= self
.fs
.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
535 self
.assertEqual(current_readdirs
, initial_readdirs
);
537 mount_b_gid
= self
.mount_b
.get_global_id()
538 mount_b_pid
= self
.mount_b
.get_client_pid()
539 # stop ceph-fuse process of mount_b
540 self
.mount_b
.client_remote
.run(args
=["sudo", "kill", "-STOP", mount_b_pid
])
542 self
.assert_session_state(mount_b_gid
, "open")
543 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
545 self
.mount_a
.run_shell(["touch", "testdir/file2"])
546 self
.assert_session_state(mount_b_gid
, "stale")
548 # resume ceph-fuse process of mount_b
549 self
.mount_b
.client_remote
.run(args
=["sudo", "kill", "-CONT", mount_b_pid
])
550 # Is the new file visible from mount_b? (caps become invalid after session stale)
551 self
.mount_b
.run_shell(["ls", "testdir/file2"])
553 def test_abort_conn(self
):
555 Check that abort_conn() skips closing mds sessions.
557 if not isinstance(self
.mount_a
, FuseMount
):
558 self
.skipTest("Testing libcephfs function")
560 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
561 session_timeout
= self
.fs
.get_var("session_timeout")
563 self
.mount_a
.umount_wait()
564 self
.mount_b
.umount_wait()
566 gid_str
= self
.mount_a
.run_python(dedent("""
567 import cephfs as libcephfs
568 cephfs = libcephfs.LibCephFS(conffile='')
570 client_id = cephfs.get_instance_id()
577 self
.assert_session_state(gid
, "open")
578 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
579 self
.assert_session_state(gid
, "stale")
581 def test_dont_mark_unresponsive_client_stale(self
):
583 Test that an unresponsive client holding caps is not marked stale or
584 evicted unless another clients wants its caps.
586 if not isinstance(self
.mount_a
, FuseMount
):
587 self
.skipTest("Require FUSE client to handle signal STOP/CONT")
589 # XXX: To conduct this test we need at least two clients since a
590 # single client is never evcited by MDS.
592 SESSION_AUTOCLOSE
= 50
593 time_at_beg
= time
.time()
594 mount_a_gid
= self
.mount_a
.get_global_id()
595 _
= self
.mount_a
.client_pid
596 self
.fs
.set_var('session_timeout', SESSION_TIMEOUT
)
597 self
.fs
.set_var('session_autoclose', SESSION_AUTOCLOSE
)
598 self
.assert_session_count(2, self
.fs
.mds_asok(['session', 'ls']))
600 # test that client holding cap not required by any other client is not
601 # marked stale when it becomes unresponsive.
602 self
.mount_a
.run_shell(['mkdir', 'dir'])
603 self
.mount_a
.send_signal('sigstop')
604 time
.sleep(SESSION_TIMEOUT
+ 2)
605 self
.assert_session_state(mount_a_gid
, "open")
607 # test that other clients have to wait to get the caps from
608 # unresponsive client until session_autoclose.
609 self
.mount_b
.run_shell(['stat', 'dir'])
610 self
.assert_session_count(1, self
.fs
.mds_asok(['session', 'ls']))
611 self
.assertLess(time
.time(), time_at_beg
+ SESSION_AUTOCLOSE
)
613 self
.mount_a
.send_signal('sigcont')
615 def test_config_session_timeout(self
):
616 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
617 session_timeout
= self
.fs
.get_var("session_timeout")
618 mount_a_gid
= self
.mount_a
.get_global_id()
620 self
.fs
.mds_asok(['session', 'config', '%s' % mount_a_gid
, 'timeout', '%s' % (session_timeout
* 2)])
624 self
.assert_session_count(2)
626 time
.sleep(session_timeout
* 1.5)
627 self
.assert_session_state(mount_a_gid
, "open")
629 time
.sleep(session_timeout
)
630 self
.assert_session_count(1)
632 self
.mount_a
.kill_cleanup()
634 def test_reconnect_after_blacklisted(self
):
636 Test reconnect after blacklisted.
637 - writing to a fd that was opened before blacklist should return -EBADF
638 - reading/writing to a file with lost file locks should return -EIO
639 - readonly fd should continue to work
642 self
.mount_a
.umount_wait()
644 if isinstance(self
.mount_a
, FuseMount
):
645 self
.skipTest("Not implemented in FUSE client yet")
648 self
.mount_a
.mount(mount_options
=['recover_session=clean'])
649 except CommandFailedError
:
650 self
.mount_a
.kill_cleanup()
651 self
.skipTest("Not implemented in current kernel")
653 self
.mount_a
.wait_until_mounted()
655 path
= os
.path
.join(self
.mount_a
.mountpoint
, 'testfile_reconnect_after_blacklisted')
656 pyscript
= dedent("""
663 fd1 = os.open("{path}.1", os.O_RDWR | os.O_CREAT, 0O666)
664 fd2 = os.open("{path}.1", os.O_RDONLY)
665 fd3 = os.open("{path}.2", os.O_RDWR | os.O_CREAT, 0O666)
666 fd4 = os.open("{path}.2", os.O_RDONLY)
668 os.write(fd1, b'content')
671 os.write(fd3, b'content')
673 fcntl.flock(fd4, fcntl.LOCK_SH | fcntl.LOCK_NB)
680 # wait for mds to close session
683 # trigger 'open session' message. kclient relies on 'session reject' message
684 # to detect if itself is blacklisted
690 # wait for auto reconnect
694 os.write(fd1, b'content')
696 if e.errno != errno.EBADF:
699 raise RuntimeError("write() failed to raise error")
706 if e.errno != errno.EIO:
709 raise RuntimeError("read() failed to raise error")
710 """).format(path
=path
)
711 rproc
= self
.mount_a
.client_remote
.run(
712 args
=['sudo', 'python3', '-c', pyscript
],
713 wait
=False, stdin
=run
.PIPE
, stdout
=run
.PIPE
)
715 rproc
.stdout
.readline()
717 mount_a_client_id
= self
.mount_a
.get_global_id()
718 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
720 rproc
.stdin
.writelines(['done\n'])
724 self
.assertEqual(rproc
.exitstatus
, 0)