from tasks.ceph_test_case import TestTimeoutError
from tasks.cephfs.cephfs_test_case import CephFSTestCase, needs_trimming
from tasks.cephfs.fuse_mount import FuseMount
+from teuthology.exceptions import CommandFailedError
import os
+from io import StringIO
log = logging.getLogger(__name__)
a fraction of second (0.5) by default when throttling condition is met.
"""
- max_caps_per_client = 500
- cap_acquisition_throttle = 250
+ subdir_count = 4
+ files_per_dir = 25
- self.config_set('mds', 'mds_max_caps_per_client', max_caps_per_client)
- self.config_set('mds', 'mds_session_cap_acquisition_throttle', cap_acquisition_throttle)
+ # throttle in a way so that two dir reads are already hitting it.
+ throttle_value = (files_per_dir * 3) // 2
- # Create 1500 files split across 6 directories, 250 each.
- for i in range(1, 7):
- self.mount_a.create_n_files("dir{0}/file".format(i), cap_acquisition_throttle, sync=True)
+ # activate throttling logic by setting max per client to a low value
+ self.config_set('mds', 'mds_max_caps_per_client', 1)
+ self.config_set('mds', 'mds_session_cap_acquisition_throttle', throttle_value)
- mount_a_client_id = self.mount_a.get_global_id()
+ # Create files split across {subdir_count} directories, {per_dir_count} in each dir
+ for i in range(1, subdir_count+1):
+ self.mount_a.create_n_files("dir{0}/file".format(i), files_per_dir, sync=True)
- # recursive readdir
- self.mount_a.run_shell_payload("find | wc")
+ mount_a_client_id = self.mount_a.get_global_id()
- # validate cap_acquisition decay counter after readdir to exceed throttle count i.e 250
- cap_acquisition_value = self.get_session(mount_a_client_id)['cap_acquisition']['value']
- self.assertGreaterEqual(cap_acquisition_value, cap_acquisition_throttle)
+ # recursive readdir. macOs wants an explicit directory for `find`.
+ proc = self.mount_a.run_shell_payload("find . | wc", stderr=StringIO())
+ # return code may be None if the command got interrupted
+ self.assertTrue(proc.returncode is None or proc.returncode == 0, proc.stderr.getvalue())
# validate the throttle condition to be hit atleast once
cap_acquisition_throttle_hit_count = self.perf_dump()['mds_server']['cap_acquisition_throttle']
self.assertGreaterEqual(cap_acquisition_throttle_hit_count, 1)
+ # validate cap_acquisition decay counter after readdir to NOT exceed the throttle value
+ # plus one batch that could have been taken immediately before querying
+ # assuming the batch is equal to the per dir file count.
+ cap_acquisition_value = self.get_session(mount_a_client_id)['cap_acquisition']['value']
+ self.assertLessEqual(cap_acquisition_value, files_per_dir + throttle_value)
+
+ # make sure that the throttle was reported in the events
+ def historic_ops_have_event(expected_event):
+ ops_dump = self.fs.rank_tell(['dump_historic_ops'])
+ # reverse the events and the ops assuming that later ops would be throttled
+ for op in reversed(ops_dump['ops']):
+ for ev in reversed(op.get('type_data', {}).get('events', [])):
+ if ev['event'] == expected_event:
+ return True
+ return False
+
+ self.assertTrue(historic_ops_have_event('cap_acquisition_throttle'))
+
def test_client_release_bug(self):
"""
When a client has a bug (which we will simulate) preventing it from releasing caps,
self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id])
rproc.wait()
+ def test_client_blocklisted_oldest_tid(self):
+ """
+ that a client is blocklisted when its encoded session metadata exceeds the
+ configured threshold (due to ever growing `completed_requests` caused due
+ to an unidentified bug (in the client or the MDS)).
+ """
+
+ # num of requests client issues
+ max_requests = 10000
+
+ # The debug hook to inject the failure only exists in the fuse client
+ if not isinstance(self.mount_a, FuseMount):
+ self.skipTest("Require FUSE client to inject client release failure")
+
+ self.config_set('client', 'client inject fixed oldest tid', 'true')
+ self.mount_a.teardown()
+ self.mount_a.mount_wait()
+
+ self.config_set('mds', 'mds_max_completed_requests', max_requests);
+
+ # Create lots of files
+ self.mount_a.create_n_files("testdir/file1", max_requests + 100)
+
+ # Create a few files synchronously. This makes sure previous requests are completed
+ self.mount_a.create_n_files("testdir/file2", 5, True)
+
+ # Wait for the health warnings. Assume mds can handle 10 request per second at least
+ self.wait_for_health("MDS_CLIENT_OLDEST_TID", max_requests // 10, check_in_detail=str(self.mount_a.client_id))
+
+ # set the threshold low so that it has a high probability of
+ # hitting.
+ self.config_set('mds', 'mds_session_metadata_threshold', 5000);
+
+ # Create lot many files synchronously. This would hit the session metadata threshold
+ # causing the client to get blocklisted.
+ with self.assertRaises(CommandFailedError):
+ self.mount_a.create_n_files("testdir/file2", 100000, True)
+
+ self.mds_cluster.is_addr_blocklisted(self.mount_a.get_global_addr())
+ # the mds should bump up the relevant perf counter
+ pd = self.perf_dump()
+ self.assertGreater(pd['mds_sessions']['mdthresh_evicted'], 0)
+
+ # reset the config
+ self.config_set('client', 'client inject fixed oldest tid', 'false')
+
+ self.mount_a.kill_cleanup()
+ self.mount_a.mount_wait()
+
def test_client_oldest_tid(self):
"""
When a client does not advance its oldest tid, the MDS should notice that