]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_client_limits.py
3 Exercise the MDS's behaviour when clients and the MDCache reach or
4 exceed the limits of how many caps/inodes they should hold.
8 from textwrap
import dedent
9 from unittest
import SkipTest
10 from teuthology
.orchestra
.run
import CommandFailedError
11 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
, needs_trimming
12 from tasks
.cephfs
.fuse_mount
import FuseMount
16 log
= logging
.getLogger(__name__
)
19 # Arbitrary timeouts for operations involving restarting
20 # an MDS or waiting for it to come up
21 MDS_RESTART_GRACE
= 60
23 # Hardcoded values from Server::recall_client_state
24 CAP_RECALL_RATIO
= 0.8
28 class TestClientLimits(CephFSTestCase
):
29 REQUIRE_KCLIENT_REMOTE
= True
32 def _test_client_pin(self
, use_subdir
):
34 When a client pins an inode in its cache, for example because the file is held open,
35 it should reject requests from the MDS to trim these caps. The MDS should complain
36 to the user that it is unable to enforce its cache size limits because of this
39 :param use_subdir: whether to put test files in a subdir or use root
45 self
.set_conf('mds', 'mds cache size', cache_size
)
46 self
.fs
.mds_fail_restart()
47 self
.fs
.wait_for_daemons()
49 mount_a_client_id
= self
.mount_a
.get_global_id()
50 path
= "subdir/mount_a" if use_subdir
else "mount_a"
51 open_proc
= self
.mount_a
.open_n_background(path
, open_files
)
53 # Client should now hold:
54 # `open_files` caps for the open files
57 self
.wait_until_equal(lambda: self
.get_session(mount_a_client_id
)['num_caps'],
58 open_files
+ (2 if use_subdir
else 1),
60 reject_fn
=lambda x
: x
> open_files
+ 2)
62 # MDS should not be happy about that, as the client is failing to comply
63 # with the SESSION_RECALL messages it is being sent
64 mds_recall_state_timeout
= int(self
.fs
.get_config("mds_recall_state_timeout"))
65 self
.wait_for_health("MDS_HEALTH_CLIENT_RECALL",
66 mds_recall_state_timeout
+ 10)
68 # We can also test that the MDS health warning for oversized
69 # cache is functioning as intended.
70 self
.wait_for_health("MDS_CACHE_OVERSIZED",
71 mds_recall_state_timeout
+ 10)
73 # When the client closes the files, it should retain only as many caps as allowed
74 # under the SESSION_RECALL policy
75 log
.info("Terminating process holding files open")
76 open_proc
.stdin
.close()
79 except CommandFailedError
:
80 # We killed it, so it raises an error
83 # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message,
84 # which depend on the cache size and overall ratio
85 self
.wait_until_equal(
86 lambda: self
.get_session(mount_a_client_id
)['num_caps'],
87 int(cache_size
* 0.8),
89 reject_fn
=lambda x
: x
< int(cache_size
*.8))
92 def test_client_pin_root(self
):
93 self
._test
_client
_pin
(False)
96 def test_client_pin(self
):
97 self
._test
_client
_pin
(True)
99 def test_client_release_bug(self
):
101 When a client has a bug (which we will simulate) preventing it from releasing caps,
102 the MDS should notice that releases are not being sent promptly, and generate a health
103 metric to that effect.
106 # The debug hook to inject the failure only exists in the fuse client
107 if not isinstance(self
.mount_a
, FuseMount
):
108 raise SkipTest("Require FUSE client to inject client release failure")
110 self
.set_conf('client.{0}'.format(self
.mount_a
.client_id
), 'client inject release failure', 'true')
111 self
.mount_a
.teardown()
113 self
.mount_a
.wait_until_mounted()
114 mount_a_client_id
= self
.mount_a
.get_global_id()
116 # Client A creates a file. He will hold the write caps on the file, and later (simulated bug) fail
117 # to comply with the MDSs request to release that cap
118 self
.mount_a
.run_shell(["touch", "file1"])
120 # Client B tries to stat the file that client A created
121 rproc
= self
.mount_b
.write_background("file1")
123 # After mds_revoke_cap_timeout, we should see a health warning (extra lag from
125 mds_revoke_cap_timeout
= int(self
.fs
.get_config("mds_revoke_cap_timeout"))
126 self
.wait_for_health("MDS_CLIENT_RECALL", mds_revoke_cap_timeout
+ 10)
128 # Client B should still be stuck
129 self
.assertFalse(rproc
.finished
)
133 self
.mount_a
.kill_cleanup()
135 # Client B should complete
136 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
])
139 def test_client_oldest_tid(self
):
141 When a client does not advance its oldest tid, the MDS should notice that
142 and generate health warnings.
145 # num of requests client issues
148 # The debug hook to inject the failure only exists in the fuse client
149 if not isinstance(self
.mount_a
, FuseMount
):
150 raise SkipTest("Require FUSE client to inject client release failure")
152 self
.set_conf('client', 'client inject fixed oldest tid', 'true')
153 self
.mount_a
.teardown()
155 self
.mount_a
.wait_until_mounted()
157 self
.fs
.mds_asok(['config', 'set', 'mds_max_completed_requests', '{0}'.format(max_requests
)])
159 # Create lots of files
160 self
.mount_a
.create_n_files("testdir/file1", max_requests
+ 100)
162 # Create a few files synchronously. This makes sure previous requests are completed
163 self
.mount_a
.create_n_files("testdir/file2", 5, True)
165 # Wait for the health warnings. Assume mds can handle 10 request per second at least
166 self
.wait_for_health("MDS_CLIENT_OLDEST_TID", max_requests
/ 10)
168 def _test_client_cache_size(self
, mount_subdir
):
170 check if client invalidate kernel dcache according to its cache size config
173 # The debug hook to inject the failure only exists in the fuse client
174 if not isinstance(self
.mount_a
, FuseMount
):
175 raise SkipTest("Require FUSE client to inject client release failure")
178 # fuse assigns a fix inode number (1) to root inode. But in mounting into
179 # subdir case, the actual inode number of root is not 1. This mismatch
180 # confuses fuse_lowlevel_notify_inval_entry() when invalidating dentries
182 self
.mount_a
.run_shell(["mkdir", "subdir"])
183 self
.mount_a
.umount_wait()
184 self
.set_conf('client', 'client mountpoint', '/subdir')
186 self
.mount_a
.wait_until_mounted()
187 root_ino
= self
.mount_a
.path_to_ino(".")
188 self
.assertEqual(root_ino
, 1);
190 dir_path
= os
.path
.join(self
.mount_a
.mountpoint
, "testdir")
192 mkdir_script
= dedent("""
195 for n in range(0, {num_dirs}):
196 os.mkdir("{path}/dir{{0}}".format(n))
200 self
.mount_a
.run_python(mkdir_script
.format(path
=dir_path
, num_dirs
=num_dirs
))
201 self
.mount_a
.run_shell(["sync"])
203 dentry_count
, dentry_pinned_count
= self
.mount_a
.get_dentry_count()
204 self
.assertGreaterEqual(dentry_count
, num_dirs
)
205 self
.assertGreaterEqual(dentry_pinned_count
, num_dirs
)
207 cache_size
= num_dirs
/ 10
208 self
.mount_a
.set_cache_size(cache_size
)
211 dentry_count
, dentry_pinned_count
= self
.mount_a
.get_dentry_count()
212 log
.info("waiting, dentry_count, dentry_pinned_count: {0}, {1}".format(
213 dentry_count
, dentry_pinned_count
215 if dentry_count
> cache_size
or dentry_pinned_count
> cache_size
:
220 self
.wait_until_true(trimmed
, 30)
223 def test_client_cache_size(self
):
224 self
._test
_client
_cache
_size
(False)
225 self
._test
_client
_cache
_size
(True)