3 Teuthology task for exercising CephFS client recovery
7 from textwrap
import dedent
9 import distutils
.version
as version
13 from teuthology
.orchestra
.run
import CommandFailedError
, ConnectionLostError
14 from tasks
.cephfs
.fuse_mount
import FuseMount
15 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
16 from teuthology
.packaging
import get_package_version
17 from unittest
import SkipTest
20 log
= logging
.getLogger(__name__
)
23 # Arbitrary timeouts for operations involving restarting
24 # an MDS or waiting for it to come up
25 MDS_RESTART_GRACE
= 60
28 class TestClientNetworkRecovery(CephFSTestCase
):
29 REQUIRE_KCLIENT_REMOTE
= True
30 REQUIRE_ONE_CLIENT_REMOTE
= True
33 LOAD_SETTINGS
= ["mds_reconnect_timeout", "ms_max_backoff"]
35 # Environment references
36 mds_reconnect_timeout
= None
39 def test_network_death(self
):
41 Simulate software freeze or temporary network failure.
43 Check that the client blocks I/O during failure, and completes
47 session_timeout
= self
.fs
.get_var("session_timeout")
48 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
50 # We only need one client
51 self
.mount_b
.umount_wait()
53 # Initially our one client session should be visible
54 client_id
= self
.mount_a
.get_global_id()
55 ls_data
= self
._session
_list
()
56 self
.assert_session_count(1, ls_data
)
57 self
.assertEqual(ls_data
[0]['id'], client_id
)
58 self
.assert_session_state(client_id
, "open")
60 # ...and capable of doing I/O without blocking
61 self
.mount_a
.create_files()
63 # ...but if we turn off the network
64 self
.fs
.set_clients_block(True)
66 # ...and try and start an I/O
67 write_blocked
= self
.mount_a
.write_background()
69 # ...then it should block
70 self
.assertFalse(write_blocked
.finished
)
71 self
.assert_session_state(client_id
, "open")
72 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
73 self
.assertFalse(write_blocked
.finished
)
74 self
.assert_session_state(client_id
, "stale")
76 # ...until we re-enable I/O
77 self
.fs
.set_clients_block(False)
79 # ...when it should complete promptly
81 self
.wait_until_true(lambda: write_blocked
.finished
, self
.ms_max_backoff
* 2)
82 write_blocked
.wait() # Already know we're finished, wait() to raise exception on errors
83 recovery_time
= time
.time() - a
84 log
.info("recovery time: {0}".format(recovery_time
))
85 self
.assert_session_state(client_id
, "open")
88 class TestClientRecovery(CephFSTestCase
):
89 REQUIRE_KCLIENT_REMOTE
= True
92 LOAD_SETTINGS
= ["mds_reconnect_timeout", "ms_max_backoff"]
94 # Environment references
95 mds_reconnect_timeout
= None
99 # Check that two clients come up healthy and see each others' files
100 # =====================================================
101 self
.mount_a
.create_files()
102 self
.mount_a
.check_files()
103 self
.mount_a
.umount_wait()
105 self
.mount_b
.check_files()
108 self
.mount_a
.wait_until_mounted()
110 # Check that the admin socket interface is correctly reporting
112 # =====================================================
113 ls_data
= self
._session
_list
()
114 self
.assert_session_count(2, ls_data
)
117 set([l
['id'] for l
in ls_data
]),
118 {self
.mount_a
.get_global_id(), self
.mount_b
.get_global_id()}
121 def test_restart(self
):
122 # Check that after an MDS restart both clients reconnect and continue
124 # =====================================================
125 self
.fs
.mds_fail_restart()
126 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
128 self
.mount_a
.create_destroy()
129 self
.mount_b
.create_destroy()
131 def _session_num_caps(self
, client_id
):
132 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
133 return int(self
._session
_by
_id
(ls_data
).get(client_id
, {'num_caps': None})['num_caps'])
135 def test_reconnect_timeout(self
):
138 # Check that if I stop an MDS and a client goes away, the MDS waits
139 # for the reconnect period
143 mount_a_client_id
= self
.mount_a
.get_global_id()
144 self
.mount_a
.umount_wait(force
=True)
146 self
.fs
.mds_restart()
148 self
.fs
.wait_for_state('up:reconnect', reject
='up:active', timeout
=MDS_RESTART_GRACE
)
149 # Check that the MDS locally reports its state correctly
150 status
= self
.fs
.mds_asok(['status'])
151 self
.assertIn("reconnect_status", status
)
153 ls_data
= self
._session
_list
()
154 self
.assert_session_count(2, ls_data
)
156 # The session for the dead client should have the 'reconnect' flag set
157 self
.assertTrue(self
.get_session(mount_a_client_id
)['reconnecting'])
159 # Wait for the reconnect state to clear, this should take the
160 # reconnect timeout period.
161 in_reconnect_for
= self
.fs
.wait_for_state('up:active', timeout
=self
.mds_reconnect_timeout
* 2)
162 # Check that the period we waited to enter active is within a factor
163 # of two of the reconnect timeout.
164 self
.assertGreater(in_reconnect_for
, self
.mds_reconnect_timeout
/ 2,
165 "Should have been in reconnect phase for {0} but only took {1}".format(
166 self
.mds_reconnect_timeout
, in_reconnect_for
169 self
.assert_session_count(1)
171 # Check that the client that timed out during reconnect can
172 # mount again and do I/O
174 self
.mount_a
.wait_until_mounted()
175 self
.mount_a
.create_destroy()
177 self
.assert_session_count(2)
179 def test_reconnect_eviction(self
):
180 # Eviction during reconnect
181 # =========================
182 mount_a_client_id
= self
.mount_a
.get_global_id()
187 # The mount goes away while the MDS is offline
193 self
.fs
.mds_restart()
195 # Enter reconnect phase
196 self
.fs
.wait_for_state('up:reconnect', reject
='up:active', timeout
=MDS_RESTART_GRACE
)
197 self
.assert_session_count(2)
199 # Evict the stuck client
200 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
201 self
.assert_session_count(1)
203 # Observe that we proceed to active phase without waiting full reconnect timeout
204 evict_til_active
= self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
205 # Once we evict the troublemaker, the reconnect phase should complete
206 # in well under the reconnect timeout.
207 self
.assertLess(evict_til_active
, self
.mds_reconnect_timeout
* 0.5,
208 "reconnect did not complete soon enough after eviction, took {0}".format(
212 # We killed earlier so must clean up before trying to use again
213 self
.mount_a
.kill_cleanup()
215 # Bring the client back
217 self
.mount_a
.wait_until_mounted()
218 self
.mount_a
.create_destroy()
220 def _test_stale_caps(self
, write
):
221 session_timeout
= self
.fs
.get_var("session_timeout")
223 # Capability release from stale session
224 # =====================================
226 cap_holder
= self
.mount_a
.open_background()
228 self
.mount_a
.run_shell(["touch", "background_file"])
229 self
.mount_a
.umount_wait()
231 self
.mount_a
.wait_until_mounted()
232 cap_holder
= self
.mount_a
.open_background(write
=False)
234 self
.assert_session_count(2)
235 mount_a_gid
= self
.mount_a
.get_global_id()
237 # Wait for the file to be visible from another client, indicating
238 # that mount_a has completed its network ops
239 self
.mount_b
.wait_for_visible()
241 # Simulate client death
244 # wait for it to die so it doesn't voluntarily release buffer cap
248 # Now, after session_timeout seconds, the waiter should
249 # complete their operation when the MDS marks the holder's
251 cap_waiter
= self
.mount_b
.write_background()
256 # Should have succeeded
257 self
.assertEqual(cap_waiter
.exitstatus
, 0)
260 self
.assert_session_count(1)
262 self
.assert_session_state(mount_a_gid
, "stale")
265 log
.info("cap_waiter waited {0}s".format(cap_waited
))
266 self
.assertTrue(session_timeout
/ 2.0 <= cap_waited
<= session_timeout
* 2.0,
267 "Capability handover took {0}, expected approx {1}".format(
268 cap_waited
, session_timeout
271 cap_holder
.stdin
.close()
274 except (CommandFailedError
, ConnectionLostError
):
275 # We killed it (and possibly its node), so it raises an error
278 # teardown() doesn't quite handle this case cleanly, so help it out
279 self
.mount_a
.kill_cleanup()
282 self
.mount_a
.wait_until_mounted()
284 def test_stale_read_caps(self
):
285 self
._test
_stale
_caps
(False)
287 def test_stale_write_caps(self
):
288 self
._test
_stale
_caps
(True)
290 def test_evicted_caps(self
):
291 # Eviction while holding a capability
292 # ===================================
294 session_timeout
= self
.fs
.get_var("session_timeout")
296 # Take out a write capability on a file on client A,
297 # and then immediately kill it.
298 cap_holder
= self
.mount_a
.open_background()
299 mount_a_client_id
= self
.mount_a
.get_global_id()
301 # Wait for the file to be visible from another client, indicating
302 # that mount_a has completed its network ops
303 self
.mount_b
.wait_for_visible()
305 # Simulate client death
308 # wait for it to die so it doesn't voluntarily release buffer cap
312 # The waiter should get stuck waiting for the capability
313 # held on the MDS by the now-dead client A
314 cap_waiter
= self
.mount_b
.write_background()
316 self
.assertFalse(cap_waiter
.finished
)
318 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
319 # Now, because I evicted the old holder of the capability, it should
320 # immediately get handed over to the waiter
325 log
.info("cap_waiter waited {0}s".format(cap_waited
))
326 # This is the check that it happened 'now' rather than waiting
327 # for the session timeout
328 self
.assertLess(cap_waited
, session_timeout
/ 2.0,
329 "Capability handover took {0}, expected less than {1}".format(
330 cap_waited
, session_timeout
/ 2.0
333 cap_holder
.stdin
.close()
336 except (CommandFailedError
, ConnectionLostError
):
337 # We killed it (and possibly its node), so it raises an error
340 self
.mount_a
.kill_cleanup()
343 self
.mount_a
.wait_until_mounted()
345 def test_trim_caps(self
):
346 # Trim capability when reconnecting MDS
347 # ===================================
350 # Create lots of files
351 for i
in range(count
):
352 self
.mount_a
.run_shell(["touch", "f{0}".format(i
)])
354 # Populate mount_b's cache
355 self
.mount_b
.run_shell(["ls", "-l"])
357 client_id
= self
.mount_b
.get_global_id()
358 num_caps
= self
._session
_num
_caps
(client_id
)
359 self
.assertGreaterEqual(num_caps
, count
)
361 # Restart MDS. client should trim its cache when reconnecting to the MDS
362 self
.fs
.mds_fail_restart()
363 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
365 num_caps
= self
._session
_num
_caps
(client_id
)
366 self
.assertLess(num_caps
, count
,
367 "should have less than {0} capabilities, have {1}".format(
371 def _is_flockable(self
):
372 a_version_str
= get_package_version(self
.mount_a
.client_remote
, "fuse")
373 b_version_str
= get_package_version(self
.mount_b
.client_remote
, "fuse")
374 flock_version_str
= "2.9"
376 version_regex
= re
.compile(r
"[0-9\.]+")
377 a_result
= version_regex
.match(a_version_str
)
378 self
.assertTrue(a_result
)
379 b_result
= version_regex
.match(b_version_str
)
380 self
.assertTrue(b_result
)
381 a_version
= version
.StrictVersion(a_result
.group())
382 b_version
= version
.StrictVersion(b_result
.group())
383 flock_version
=version
.StrictVersion(flock_version_str
)
385 if (a_version
>= flock_version
and b_version
>= flock_version
):
386 log
.info("flock locks are available")
389 log
.info("not testing flock locks, machines have versions {av} and {bv}".format(
390 av
=a_version_str
,bv
=b_version_str
))
393 def test_filelock(self
):
395 Check that file lock doesn't get lost after an MDS restart
398 flockable
= self
._is
_flockable
()
399 lock_holder
= self
.mount_a
.lock_background(do_flock
=flockable
)
401 self
.mount_b
.wait_for_visible("background_file-2")
402 self
.mount_b
.check_filelock(do_flock
=flockable
)
404 self
.fs
.mds_fail_restart()
405 self
.fs
.wait_for_state('up:active', timeout
=MDS_RESTART_GRACE
)
407 self
.mount_b
.check_filelock(do_flock
=flockable
)
409 # Tear down the background process
410 lock_holder
.stdin
.close()
413 except (CommandFailedError
, ConnectionLostError
):
414 # We killed it, so it raises an error
417 def test_filelock_eviction(self
):
419 Check that file lock held by evicted client is given to
422 if not self
._is
_flockable
():
423 self
.skipTest("flock is not available")
425 lock_holder
= self
.mount_a
.lock_background()
426 self
.mount_b
.wait_for_visible("background_file-2")
427 self
.mount_b
.check_filelock()
429 lock_taker
= self
.mount_b
.lock_and_release()
430 # Check the taker is waiting (doesn't get it immediately)
432 self
.assertFalse(lock_holder
.finished
)
433 self
.assertFalse(lock_taker
.finished
)
436 mount_a_client_id
= self
.mount_a
.get_global_id()
437 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
439 # Evicting mount_a should let mount_b's attempt to take the lock
441 self
.wait_until_true(lambda: lock_taker
.finished
, timeout
=10)
443 # teardown() doesn't quite handle this case cleanly, so help it out
445 self
.mount_a
.kill_cleanup()
447 # Bring the client back
449 self
.mount_a
.wait_until_mounted()
451 def test_dir_fsync(self
):
452 self
._test
_fsync
(True);
454 def test_create_fsync(self
):
455 self
._test
_fsync
(False);
457 def _test_fsync(self
, dirfsync
):
459 That calls to fsync guarantee visibility of metadata to another
460 client immediately after the fsyncing client dies.
463 # Leave this guy out until he's needed
464 self
.mount_b
.umount_wait()
466 # Create dir + child dentry on client A, and fsync the dir
467 path
= os
.path
.join(self
.mount_a
.mountpoint
, "subdir")
468 self
.mount_a
.run_python(
475 print "Starting creation..."
479 dfd = os.open(path, os.O_DIRECTORY)
481 fd = open(os.path.join(path, "childfile"), "w")
482 print "Finished creation in {{0}}s".format(time.time() - start)
484 print "Starting fsync..."
490 print "Finished fsync in {{0}}s".format(time.time() - start)
491 """.format(path
=path
,dirfsync
=str(dirfsync
)))
494 # Immediately kill the MDS and then client A
498 self
.mount_a
.kill_cleanup()
500 # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay
501 self
.fs
.mds_restart()
502 log
.info("Waiting for reconnect...")
503 self
.fs
.wait_for_state("up:reconnect")
504 log
.info("Waiting for active...")
505 self
.fs
.wait_for_state("up:active", timeout
=MDS_RESTART_GRACE
+ self
.mds_reconnect_timeout
)
506 log
.info("Reached active...")
508 # Is the child dentry visible from mount B?
510 self
.mount_b
.wait_until_mounted()
511 self
.mount_b
.run_shell(["ls", "subdir/childfile"])
513 def test_unmount_for_evicted_client(self
):
514 """Test if client hangs on unmount after evicting the client."""
515 mount_a_client_id
= self
.mount_a
.get_global_id()
516 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
518 self
.mount_a
.umount_wait(require_clean
=True, timeout
=30)
520 def test_stale_renew(self
):
521 if not isinstance(self
.mount_a
, FuseMount
):
522 raise SkipTest("Require FUSE client to handle signal STOP/CONT")
524 session_timeout
= self
.fs
.get_var("session_timeout")
526 self
.mount_a
.run_shell(["mkdir", "testdir"])
527 self
.mount_a
.run_shell(["touch", "testdir/file1"])
528 # populate readdir cache
529 self
.mount_a
.run_shell(["ls", "testdir"])
530 self
.mount_b
.run_shell(["ls", "testdir"])
532 # check if readdir cache is effective
533 initial_readdirs
= self
.fs
.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
534 self
.mount_b
.run_shell(["ls", "testdir"])
535 current_readdirs
= self
.fs
.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
536 self
.assertEqual(current_readdirs
, initial_readdirs
);
538 mount_b_gid
= self
.mount_b
.get_global_id()
539 mount_b_pid
= self
.mount_b
.get_client_pid()
540 # stop ceph-fuse process of mount_b
541 self
.mount_b
.client_remote
.run(args
=["sudo", "kill", "-STOP", mount_b_pid
])
543 self
.assert_session_state(mount_b_gid
, "open")
544 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
546 self
.mount_a
.run_shell(["touch", "testdir/file2"])
547 self
.assert_session_state(mount_b_gid
, "stale")
549 # resume ceph-fuse process of mount_b
550 self
.mount_b
.client_remote
.run(args
=["sudo", "kill", "-CONT", mount_b_pid
])
551 # Is the new file visible from mount_b? (caps become invalid after session stale)
552 self
.mount_b
.run_shell(["ls", "testdir/file2"])
554 def test_abort_conn(self
):
556 Check that abort_conn() skips closing mds sessions.
558 if not isinstance(self
.mount_a
, FuseMount
):
559 raise SkipTest("Testing libcephfs function")
561 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
562 session_timeout
= self
.fs
.get_var("session_timeout")
564 self
.mount_a
.umount_wait()
565 self
.mount_b
.umount_wait()
567 gid_str
= self
.mount_a
.run_python(dedent("""
568 import cephfs as libcephfs
569 cephfs = libcephfs.LibCephFS(conffile='')
571 client_id = cephfs.get_instance_id()
578 self
.assert_session_state(gid
, "open")
579 time
.sleep(session_timeout
* 1.5) # Long enough for MDS to consider session stale
580 self
.assert_session_state(gid
, "stale")
582 def test_dont_mark_unresponsive_client_stale(self
):
584 Test that an unresponsive client holding caps is not marked stale or
585 evicted unless another clients wants its caps.
587 if not isinstance(self
.mount_a
, FuseMount
):
588 self
.skipTest("Require FUSE client to handle signal STOP/CONT")
590 # XXX: To conduct this test we need at least two clients since a
591 # single client is never evcited by MDS.
593 SESSION_AUTOCLOSE
= 50
594 time_at_beg
= time
.time()
595 mount_a_gid
= self
.mount_a
.get_global_id()
596 mount_a_pid
= self
.mount_a
.client_pid
597 self
.fs
.set_var('session_timeout', SESSION_TIMEOUT
)
598 self
.fs
.set_var('session_autoclose', SESSION_AUTOCLOSE
)
599 self
.assert_session_count(2, self
.fs
.mds_asok(['session', 'ls']))
601 # test that client holding cap not required by any other client is not
602 # marked stale when it becomes unresponsive.
603 self
.mount_a
.run_shell(['mkdir', 'dir'])
604 self
.mount_a
.send_signal('sigstop')
605 time
.sleep(SESSION_TIMEOUT
+ 2)
606 self
.assert_session_state(mount_a_gid
, "open")
608 # test that other clients have to wait to get the caps from
609 # unresponsive client until session_autoclose.
610 self
.mount_b
.run_shell(['stat', 'dir'])
611 self
.assert_session_count(1, self
.fs
.mds_asok(['session', 'ls']))
612 self
.assertLess(time
.time(), time_at_beg
+ SESSION_AUTOCLOSE
)
614 self
.mount_a
.send_signal('sigcont')
616 def test_config_session_timeout(self
):
617 self
.fs
.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
618 session_timeout
= self
.fs
.get_var("session_timeout")
619 mount_a_gid
= self
.mount_a
.get_global_id()
621 self
.fs
.mds_asok(['session', 'config', '%s' % mount_a_gid
, 'timeout', '%s' % (session_timeout
* 2)])
625 self
.assert_session_count(2)
627 time
.sleep(session_timeout
* 1.5)
628 self
.assert_session_state(mount_a_gid
, "open")
630 time
.sleep(session_timeout
)
631 self
.assert_session_count(1)
633 self
.mount_a
.kill_cleanup()