]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_client_recovery.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / qa / tasks / cephfs / test_client_recovery.py
1
2 """
3 Teuthology task for exercising CephFS client recovery
4 """
5
6 import logging
7 from textwrap import dedent
8 import time
9 import distutils.version as version
10 import re
11 import os
12
13 from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
14 from tasks.cephfs.fuse_mount import FuseMount
15 from tasks.cephfs.cephfs_test_case import CephFSTestCase
16 from teuthology.packaging import get_package_version
17 from unittest import SkipTest
18
19
20 log = logging.getLogger(__name__)
21
22
23 # Arbitrary timeouts for operations involving restarting
24 # an MDS or waiting for it to come up
25 MDS_RESTART_GRACE = 60
26
27
28 class TestClientNetworkRecovery(CephFSTestCase):
29 REQUIRE_KCLIENT_REMOTE = True
30 REQUIRE_ONE_CLIENT_REMOTE = True
31 CLIENTS_REQUIRED = 2
32
33 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
34
35 # Environment references
36 mds_reconnect_timeout = None
37 ms_max_backoff = None
38
39 def test_network_death(self):
40 """
41 Simulate software freeze or temporary network failure.
42
43 Check that the client blocks I/O during failure, and completes
44 I/O after failure.
45 """
46
47 session_timeout = self.fs.get_var("session_timeout")
48 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
49
50 # We only need one client
51 self.mount_b.umount_wait()
52
53 # Initially our one client session should be visible
54 client_id = self.mount_a.get_global_id()
55 ls_data = self._session_list()
56 self.assert_session_count(1, ls_data)
57 self.assertEqual(ls_data[0]['id'], client_id)
58 self.assert_session_state(client_id, "open")
59
60 # ...and capable of doing I/O without blocking
61 self.mount_a.create_files()
62
63 # ...but if we turn off the network
64 self.fs.set_clients_block(True)
65
66 # ...and try and start an I/O
67 write_blocked = self.mount_a.write_background()
68
69 # ...then it should block
70 self.assertFalse(write_blocked.finished)
71 self.assert_session_state(client_id, "open")
72 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
73 self.assertFalse(write_blocked.finished)
74 self.assert_session_state(client_id, "stale")
75
76 # ...until we re-enable I/O
77 self.fs.set_clients_block(False)
78
79 # ...when it should complete promptly
80 a = time.time()
81 self.wait_until_true(lambda: write_blocked.finished, self.ms_max_backoff * 2)
82 write_blocked.wait() # Already know we're finished, wait() to raise exception on errors
83 recovery_time = time.time() - a
84 log.info("recovery time: {0}".format(recovery_time))
85 self.assert_session_state(client_id, "open")
86
87
88 class TestClientRecovery(CephFSTestCase):
89 REQUIRE_KCLIENT_REMOTE = True
90 CLIENTS_REQUIRED = 2
91
92 LOAD_SETTINGS = ["mds_reconnect_timeout", "ms_max_backoff"]
93
94 # Environment references
95 mds_reconnect_timeout = None
96 ms_max_backoff = None
97
98 def test_basic(self):
99 # Check that two clients come up healthy and see each others' files
100 # =====================================================
101 self.mount_a.create_files()
102 self.mount_a.check_files()
103 self.mount_a.umount_wait()
104
105 self.mount_b.check_files()
106
107 self.mount_a.mount()
108 self.mount_a.wait_until_mounted()
109
110 # Check that the admin socket interface is correctly reporting
111 # two sessions
112 # =====================================================
113 ls_data = self._session_list()
114 self.assert_session_count(2, ls_data)
115
116 self.assertSetEqual(
117 set([l['id'] for l in ls_data]),
118 {self.mount_a.get_global_id(), self.mount_b.get_global_id()}
119 )
120
121 def test_restart(self):
122 # Check that after an MDS restart both clients reconnect and continue
123 # to handle I/O
124 # =====================================================
125 self.fs.mds_fail_restart()
126 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
127
128 self.mount_a.create_destroy()
129 self.mount_b.create_destroy()
130
131 def _session_num_caps(self, client_id):
132 ls_data = self.fs.mds_asok(['session', 'ls'])
133 return int(self._session_by_id(ls_data).get(client_id, {'num_caps': None})['num_caps'])
134
135 def test_reconnect_timeout(self):
136 # Reconnect timeout
137 # =================
138 # Check that if I stop an MDS and a client goes away, the MDS waits
139 # for the reconnect period
140 self.fs.mds_stop()
141 self.fs.mds_fail()
142
143 mount_a_client_id = self.mount_a.get_global_id()
144 self.mount_a.umount_wait(force=True)
145
146 self.fs.mds_restart()
147
148 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
149 # Check that the MDS locally reports its state correctly
150 status = self.fs.mds_asok(['status'])
151 self.assertIn("reconnect_status", status)
152
153 ls_data = self._session_list()
154 self.assert_session_count(2, ls_data)
155
156 # The session for the dead client should have the 'reconnect' flag set
157 self.assertTrue(self.get_session(mount_a_client_id)['reconnecting'])
158
159 # Wait for the reconnect state to clear, this should take the
160 # reconnect timeout period.
161 in_reconnect_for = self.fs.wait_for_state('up:active', timeout=self.mds_reconnect_timeout * 2)
162 # Check that the period we waited to enter active is within a factor
163 # of two of the reconnect timeout.
164 self.assertGreater(in_reconnect_for, self.mds_reconnect_timeout / 2,
165 "Should have been in reconnect phase for {0} but only took {1}".format(
166 self.mds_reconnect_timeout, in_reconnect_for
167 ))
168
169 self.assert_session_count(1)
170
171 # Check that the client that timed out during reconnect can
172 # mount again and do I/O
173 self.mount_a.mount()
174 self.mount_a.wait_until_mounted()
175 self.mount_a.create_destroy()
176
177 self.assert_session_count(2)
178
179 def test_reconnect_eviction(self):
180 # Eviction during reconnect
181 # =========================
182 mount_a_client_id = self.mount_a.get_global_id()
183
184 self.fs.mds_stop()
185 self.fs.mds_fail()
186
187 # The mount goes away while the MDS is offline
188 self.mount_a.kill()
189
190 # wait for it to die
191 time.sleep(5)
192
193 self.fs.mds_restart()
194
195 # Enter reconnect phase
196 self.fs.wait_for_state('up:reconnect', reject='up:active', timeout=MDS_RESTART_GRACE)
197 self.assert_session_count(2)
198
199 # Evict the stuck client
200 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
201 self.assert_session_count(1)
202
203 # Observe that we proceed to active phase without waiting full reconnect timeout
204 evict_til_active = self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
205 # Once we evict the troublemaker, the reconnect phase should complete
206 # in well under the reconnect timeout.
207 self.assertLess(evict_til_active, self.mds_reconnect_timeout * 0.5,
208 "reconnect did not complete soon enough after eviction, took {0}".format(
209 evict_til_active
210 ))
211
212 # We killed earlier so must clean up before trying to use again
213 self.mount_a.kill_cleanup()
214
215 # Bring the client back
216 self.mount_a.mount()
217 self.mount_a.wait_until_mounted()
218 self.mount_a.create_destroy()
219
220 def _test_stale_caps(self, write):
221 session_timeout = self.fs.get_var("session_timeout")
222
223 # Capability release from stale session
224 # =====================================
225 if write:
226 cap_holder = self.mount_a.open_background()
227 else:
228 self.mount_a.run_shell(["touch", "background_file"])
229 self.mount_a.umount_wait()
230 self.mount_a.mount()
231 self.mount_a.wait_until_mounted()
232 cap_holder = self.mount_a.open_background(write=False)
233
234 self.assert_session_count(2)
235 mount_a_gid = self.mount_a.get_global_id()
236
237 # Wait for the file to be visible from another client, indicating
238 # that mount_a has completed its network ops
239 self.mount_b.wait_for_visible()
240
241 # Simulate client death
242 self.mount_a.kill()
243
244 # wait for it to die so it doesn't voluntarily release buffer cap
245 time.sleep(5)
246
247 try:
248 # Now, after session_timeout seconds, the waiter should
249 # complete their operation when the MDS marks the holder's
250 # session stale.
251 cap_waiter = self.mount_b.write_background()
252 a = time.time()
253 cap_waiter.wait()
254 b = time.time()
255
256 # Should have succeeded
257 self.assertEqual(cap_waiter.exitstatus, 0)
258
259 if write:
260 self.assert_session_count(1)
261 else:
262 self.assert_session_state(mount_a_gid, "stale")
263
264 cap_waited = b - a
265 log.info("cap_waiter waited {0}s".format(cap_waited))
266 self.assertTrue(session_timeout / 2.0 <= cap_waited <= session_timeout * 2.0,
267 "Capability handover took {0}, expected approx {1}".format(
268 cap_waited, session_timeout
269 ))
270
271 cap_holder.stdin.close()
272 try:
273 cap_holder.wait()
274 except (CommandFailedError, ConnectionLostError):
275 # We killed it (and possibly its node), so it raises an error
276 pass
277 finally:
278 # teardown() doesn't quite handle this case cleanly, so help it out
279 self.mount_a.kill_cleanup()
280
281 self.mount_a.mount()
282 self.mount_a.wait_until_mounted()
283
284 def test_stale_read_caps(self):
285 self._test_stale_caps(False)
286
287 def test_stale_write_caps(self):
288 self._test_stale_caps(True)
289
290 def test_evicted_caps(self):
291 # Eviction while holding a capability
292 # ===================================
293
294 session_timeout = self.fs.get_var("session_timeout")
295
296 # Take out a write capability on a file on client A,
297 # and then immediately kill it.
298 cap_holder = self.mount_a.open_background()
299 mount_a_client_id = self.mount_a.get_global_id()
300
301 # Wait for the file to be visible from another client, indicating
302 # that mount_a has completed its network ops
303 self.mount_b.wait_for_visible()
304
305 # Simulate client death
306 self.mount_a.kill()
307
308 # wait for it to die so it doesn't voluntarily release buffer cap
309 time.sleep(5)
310
311 try:
312 # The waiter should get stuck waiting for the capability
313 # held on the MDS by the now-dead client A
314 cap_waiter = self.mount_b.write_background()
315 time.sleep(5)
316 self.assertFalse(cap_waiter.finished)
317
318 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
319 # Now, because I evicted the old holder of the capability, it should
320 # immediately get handed over to the waiter
321 a = time.time()
322 cap_waiter.wait()
323 b = time.time()
324 cap_waited = b - a
325 log.info("cap_waiter waited {0}s".format(cap_waited))
326 # This is the check that it happened 'now' rather than waiting
327 # for the session timeout
328 self.assertLess(cap_waited, session_timeout / 2.0,
329 "Capability handover took {0}, expected less than {1}".format(
330 cap_waited, session_timeout / 2.0
331 ))
332
333 cap_holder.stdin.close()
334 try:
335 cap_holder.wait()
336 except (CommandFailedError, ConnectionLostError):
337 # We killed it (and possibly its node), so it raises an error
338 pass
339 finally:
340 self.mount_a.kill_cleanup()
341
342 self.mount_a.mount()
343 self.mount_a.wait_until_mounted()
344
345 def test_trim_caps(self):
346 # Trim capability when reconnecting MDS
347 # ===================================
348
349 count = 500
350 # Create lots of files
351 for i in range(count):
352 self.mount_a.run_shell(["touch", "f{0}".format(i)])
353
354 # Populate mount_b's cache
355 self.mount_b.run_shell(["ls", "-l"])
356
357 client_id = self.mount_b.get_global_id()
358 num_caps = self._session_num_caps(client_id)
359 self.assertGreaterEqual(num_caps, count)
360
361 # Restart MDS. client should trim its cache when reconnecting to the MDS
362 self.fs.mds_fail_restart()
363 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
364
365 num_caps = self._session_num_caps(client_id)
366 self.assertLess(num_caps, count,
367 "should have less than {0} capabilities, have {1}".format(
368 count, num_caps
369 ))
370
371 def _is_flockable(self):
372 a_version_str = get_package_version(self.mount_a.client_remote, "fuse")
373 b_version_str = get_package_version(self.mount_b.client_remote, "fuse")
374 flock_version_str = "2.9"
375
376 version_regex = re.compile(r"[0-9\.]+")
377 a_result = version_regex.match(a_version_str)
378 self.assertTrue(a_result)
379 b_result = version_regex.match(b_version_str)
380 self.assertTrue(b_result)
381 a_version = version.StrictVersion(a_result.group())
382 b_version = version.StrictVersion(b_result.group())
383 flock_version=version.StrictVersion(flock_version_str)
384
385 if (a_version >= flock_version and b_version >= flock_version):
386 log.info("flock locks are available")
387 return True
388 else:
389 log.info("not testing flock locks, machines have versions {av} and {bv}".format(
390 av=a_version_str,bv=b_version_str))
391 return False
392
393 def test_filelock(self):
394 """
395 Check that file lock doesn't get lost after an MDS restart
396 """
397
398 flockable = self._is_flockable()
399 lock_holder = self.mount_a.lock_background(do_flock=flockable)
400
401 self.mount_b.wait_for_visible("background_file-2")
402 self.mount_b.check_filelock(do_flock=flockable)
403
404 self.fs.mds_fail_restart()
405 self.fs.wait_for_state('up:active', timeout=MDS_RESTART_GRACE)
406
407 self.mount_b.check_filelock(do_flock=flockable)
408
409 # Tear down the background process
410 lock_holder.stdin.close()
411 try:
412 lock_holder.wait()
413 except (CommandFailedError, ConnectionLostError):
414 # We killed it, so it raises an error
415 pass
416
417 def test_filelock_eviction(self):
418 """
419 Check that file lock held by evicted client is given to
420 waiting client.
421 """
422 if not self._is_flockable():
423 self.skipTest("flock is not available")
424
425 lock_holder = self.mount_a.lock_background()
426 self.mount_b.wait_for_visible("background_file-2")
427 self.mount_b.check_filelock()
428
429 lock_taker = self.mount_b.lock_and_release()
430 # Check the taker is waiting (doesn't get it immediately)
431 time.sleep(2)
432 self.assertFalse(lock_holder.finished)
433 self.assertFalse(lock_taker.finished)
434
435 try:
436 mount_a_client_id = self.mount_a.get_global_id()
437 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
438
439 # Evicting mount_a should let mount_b's attempt to take the lock
440 # succeed
441 self.wait_until_true(lambda: lock_taker.finished, timeout=10)
442 finally:
443 # teardown() doesn't quite handle this case cleanly, so help it out
444 self.mount_a.kill()
445 self.mount_a.kill_cleanup()
446
447 # Bring the client back
448 self.mount_a.mount()
449 self.mount_a.wait_until_mounted()
450
451 def test_dir_fsync(self):
452 self._test_fsync(True);
453
454 def test_create_fsync(self):
455 self._test_fsync(False);
456
457 def _test_fsync(self, dirfsync):
458 """
459 That calls to fsync guarantee visibility of metadata to another
460 client immediately after the fsyncing client dies.
461 """
462
463 # Leave this guy out until he's needed
464 self.mount_b.umount_wait()
465
466 # Create dir + child dentry on client A, and fsync the dir
467 path = os.path.join(self.mount_a.mountpoint, "subdir")
468 self.mount_a.run_python(
469 dedent("""
470 import os
471 import time
472
473 path = "{path}"
474
475 print "Starting creation..."
476 start = time.time()
477
478 os.mkdir(path)
479 dfd = os.open(path, os.O_DIRECTORY)
480
481 fd = open(os.path.join(path, "childfile"), "w")
482 print "Finished creation in {{0}}s".format(time.time() - start)
483
484 print "Starting fsync..."
485 start = time.time()
486 if {dirfsync}:
487 os.fsync(dfd)
488 else:
489 os.fsync(fd)
490 print "Finished fsync in {{0}}s".format(time.time() - start)
491 """.format(path=path,dirfsync=str(dirfsync)))
492 )
493
494 # Immediately kill the MDS and then client A
495 self.fs.mds_stop()
496 self.fs.mds_fail()
497 self.mount_a.kill()
498 self.mount_a.kill_cleanup()
499
500 # Restart the MDS. Wait for it to come up, it'll have to time out in clientreplay
501 self.fs.mds_restart()
502 log.info("Waiting for reconnect...")
503 self.fs.wait_for_state("up:reconnect")
504 log.info("Waiting for active...")
505 self.fs.wait_for_state("up:active", timeout=MDS_RESTART_GRACE + self.mds_reconnect_timeout)
506 log.info("Reached active...")
507
508 # Is the child dentry visible from mount B?
509 self.mount_b.mount()
510 self.mount_b.wait_until_mounted()
511 self.mount_b.run_shell(["ls", "subdir/childfile"])
512
513 def test_unmount_for_evicted_client(self):
514 """Test if client hangs on unmount after evicting the client."""
515 mount_a_client_id = self.mount_a.get_global_id()
516 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
517
518 self.mount_a.umount_wait(require_clean=True, timeout=30)
519
520 def test_stale_renew(self):
521 if not isinstance(self.mount_a, FuseMount):
522 raise SkipTest("Require FUSE client to handle signal STOP/CONT")
523
524 session_timeout = self.fs.get_var("session_timeout")
525
526 self.mount_a.run_shell(["mkdir", "testdir"])
527 self.mount_a.run_shell(["touch", "testdir/file1"])
528 # populate readdir cache
529 self.mount_a.run_shell(["ls", "testdir"])
530 self.mount_b.run_shell(["ls", "testdir"])
531
532 # check if readdir cache is effective
533 initial_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
534 self.mount_b.run_shell(["ls", "testdir"])
535 current_readdirs = self.fs.mds_asok(['perf', 'dump', 'mds_server', 'req_readdir_latency'])
536 self.assertEqual(current_readdirs, initial_readdirs);
537
538 mount_b_gid = self.mount_b.get_global_id()
539 mount_b_pid = self.mount_b.get_client_pid()
540 # stop ceph-fuse process of mount_b
541 self.mount_b.client_remote.run(args=["sudo", "kill", "-STOP", mount_b_pid])
542
543 self.assert_session_state(mount_b_gid, "open")
544 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
545
546 self.mount_a.run_shell(["touch", "testdir/file2"])
547 self.assert_session_state(mount_b_gid, "stale")
548
549 # resume ceph-fuse process of mount_b
550 self.mount_b.client_remote.run(args=["sudo", "kill", "-CONT", mount_b_pid])
551 # Is the new file visible from mount_b? (caps become invalid after session stale)
552 self.mount_b.run_shell(["ls", "testdir/file2"])
553
554 def test_abort_conn(self):
555 """
556 Check that abort_conn() skips closing mds sessions.
557 """
558 if not isinstance(self.mount_a, FuseMount):
559 raise SkipTest("Testing libcephfs function")
560
561 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
562 session_timeout = self.fs.get_var("session_timeout")
563
564 self.mount_a.umount_wait()
565 self.mount_b.umount_wait()
566
567 gid_str = self.mount_a.run_python(dedent("""
568 import cephfs as libcephfs
569 cephfs = libcephfs.LibCephFS(conffile='')
570 cephfs.mount()
571 client_id = cephfs.get_instance_id()
572 cephfs.abort_conn()
573 print client_id
574 """)
575 )
576 gid = int(gid_str);
577
578 self.assert_session_state(gid, "open")
579 time.sleep(session_timeout * 1.5) # Long enough for MDS to consider session stale
580 self.assert_session_state(gid, "stale")
581
582 def test_dont_mark_unresponsive_client_stale(self):
583 """
584 Test that an unresponsive client holding caps is not marked stale or
585 evicted unless another clients wants its caps.
586 """
587 if not isinstance(self.mount_a, FuseMount):
588 self.skipTest("Require FUSE client to handle signal STOP/CONT")
589
590 # XXX: To conduct this test we need at least two clients since a
591 # single client is never evcited by MDS.
592 SESSION_TIMEOUT = 30
593 SESSION_AUTOCLOSE = 50
594 time_at_beg = time.time()
595 mount_a_gid = self.mount_a.get_global_id()
596 mount_a_pid = self.mount_a.client_pid
597 self.fs.set_var('session_timeout', SESSION_TIMEOUT)
598 self.fs.set_var('session_autoclose', SESSION_AUTOCLOSE)
599 self.assert_session_count(2, self.fs.mds_asok(['session', 'ls']))
600
601 # test that client holding cap not required by any other client is not
602 # marked stale when it becomes unresponsive.
603 self.mount_a.run_shell(['mkdir', 'dir'])
604 self.mount_a.send_signal('sigstop')
605 time.sleep(SESSION_TIMEOUT + 2)
606 self.assert_session_state(mount_a_gid, "open")
607
608 # test that other clients have to wait to get the caps from
609 # unresponsive client until session_autoclose.
610 self.mount_b.run_shell(['stat', 'dir'])
611 self.assert_session_count(1, self.fs.mds_asok(['session', 'ls']))
612 self.assertLess(time.time(), time_at_beg + SESSION_AUTOCLOSE)
613
614 self.mount_a.send_signal('sigcont')
615
616 def test_config_session_timeout(self):
617 self.fs.mds_asok(['config', 'set', 'mds_defer_session_stale', 'false'])
618 session_timeout = self.fs.get_var("session_timeout")
619 mount_a_gid = self.mount_a.get_global_id()
620
621 self.fs.mds_asok(['session', 'config', '%s' % mount_a_gid, 'timeout', '%s' % (session_timeout * 2)])
622
623 self.mount_a.kill();
624
625 self.assert_session_count(2)
626
627 time.sleep(session_timeout * 1.5)
628 self.assert_session_state(mount_a_gid, "open")
629
630 time.sleep(session_timeout)
631 self.assert_session_count(1)
632
633 self.mount_a.kill_cleanup()