]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | """ | |
3 | Exercise the MDS's behaviour when clients and the MDCache reach or | |
4 | exceed the limits of how many caps/inodes they should hold. | |
5 | """ | |
6 | ||
7 | import logging | |
8 | from textwrap import dedent | |
9 | from unittest import SkipTest | |
10 | from teuthology.orchestra.run import CommandFailedError | |
11 | from tasks.cephfs.cephfs_test_case import CephFSTestCase, needs_trimming | |
12 | from tasks.cephfs.fuse_mount import FuseMount | |
13 | import os | |
14 | ||
15 | ||
16 | log = logging.getLogger(__name__) | |
17 | ||
18 | ||
19 | # Arbitrary timeouts for operations involving restarting | |
20 | # an MDS or waiting for it to come up | |
21 | MDS_RESTART_GRACE = 60 | |
22 | ||
23 | # Hardcoded values from Server::recall_client_state | |
24 | CAP_RECALL_RATIO = 0.8 | |
25 | CAP_RECALL_MIN = 100 | |
26 | ||
27 | ||
28 | class TestClientLimits(CephFSTestCase): | |
29 | REQUIRE_KCLIENT_REMOTE = True | |
30 | CLIENTS_REQUIRED = 2 | |
31 | ||
32 | def _test_client_pin(self, use_subdir): | |
33 | """ | |
34 | When a client pins an inode in its cache, for example because the file is held open, | |
35 | it should reject requests from the MDS to trim these caps. The MDS should complain | |
36 | to the user that it is unable to enforce its cache size limits because of this | |
37 | objectionable client. | |
38 | ||
39 | :param use_subdir: whether to put test files in a subdir or use root | |
40 | """ | |
41 | ||
42 | cache_size = 100 | |
43 | open_files = 200 | |
44 | ||
45 | self.set_conf('mds', 'mds cache size', cache_size) | |
46 | self.fs.mds_fail_restart() | |
47 | self.fs.wait_for_daemons() | |
48 | ||
49 | mount_a_client_id = self.mount_a.get_global_id() | |
50 | path = "subdir/mount_a" if use_subdir else "mount_a" | |
51 | open_proc = self.mount_a.open_n_background(path, open_files) | |
52 | ||
53 | # Client should now hold: | |
54 | # `open_files` caps for the open files | |
55 | # 1 cap for root | |
56 | # 1 cap for subdir | |
57 | self.wait_until_equal(lambda: self.get_session(mount_a_client_id)['num_caps'], | |
58 | open_files + (2 if use_subdir else 1), | |
59 | timeout=600, | |
60 | reject_fn=lambda x: x > open_files + 2) | |
61 | ||
62 | # MDS should not be happy about that, as the client is failing to comply | |
63 | # with the SESSION_RECALL messages it is being sent | |
64 | mds_recall_state_timeout = int(self.fs.get_config("mds_recall_state_timeout")) | |
224ce89b | 65 | self.wait_for_health("MDS_HEALTH_CLIENT_RECALL", |
7c673cae FG |
66 | mds_recall_state_timeout + 10) |
67 | ||
68 | # We can also test that the MDS health warning for oversized | |
69 | # cache is functioning as intended. | |
224ce89b | 70 | self.wait_for_health("MDS_CACHE_OVERSIZED", |
7c673cae FG |
71 | mds_recall_state_timeout + 10) |
72 | ||
73 | # When the client closes the files, it should retain only as many caps as allowed | |
74 | # under the SESSION_RECALL policy | |
75 | log.info("Terminating process holding files open") | |
76 | open_proc.stdin.close() | |
77 | try: | |
78 | open_proc.wait() | |
79 | except CommandFailedError: | |
80 | # We killed it, so it raises an error | |
81 | pass | |
82 | ||
83 | # The remaining caps should comply with the numbers sent from MDS in SESSION_RECALL message, | |
84 | # which depend on the cache size and overall ratio | |
85 | self.wait_until_equal( | |
86 | lambda: self.get_session(mount_a_client_id)['num_caps'], | |
87 | int(cache_size * 0.8), | |
88 | timeout=600, | |
89 | reject_fn=lambda x: x < int(cache_size*.8)) | |
90 | ||
91 | @needs_trimming | |
92 | def test_client_pin_root(self): | |
93 | self._test_client_pin(False) | |
94 | ||
95 | @needs_trimming | |
96 | def test_client_pin(self): | |
97 | self._test_client_pin(True) | |
98 | ||
99 | def test_client_release_bug(self): | |
100 | """ | |
101 | When a client has a bug (which we will simulate) preventing it from releasing caps, | |
102 | the MDS should notice that releases are not being sent promptly, and generate a health | |
103 | metric to that effect. | |
104 | """ | |
105 | ||
106 | # The debug hook to inject the failure only exists in the fuse client | |
107 | if not isinstance(self.mount_a, FuseMount): | |
108 | raise SkipTest("Require FUSE client to inject client release failure") | |
109 | ||
110 | self.set_conf('client.{0}'.format(self.mount_a.client_id), 'client inject release failure', 'true') | |
111 | self.mount_a.teardown() | |
112 | self.mount_a.mount() | |
113 | self.mount_a.wait_until_mounted() | |
114 | mount_a_client_id = self.mount_a.get_global_id() | |
115 | ||
116 | # Client A creates a file. He will hold the write caps on the file, and later (simulated bug) fail | |
117 | # to comply with the MDSs request to release that cap | |
118 | self.mount_a.run_shell(["touch", "file1"]) | |
119 | ||
120 | # Client B tries to stat the file that client A created | |
121 | rproc = self.mount_b.write_background("file1") | |
122 | ||
123 | # After mds_revoke_cap_timeout, we should see a health warning (extra lag from | |
124 | # MDS beacon period) | |
125 | mds_revoke_cap_timeout = int(self.fs.get_config("mds_revoke_cap_timeout")) | |
224ce89b | 126 | self.wait_for_health("MDS_CLIENT_RECALL", mds_revoke_cap_timeout + 10) |
7c673cae FG |
127 | |
128 | # Client B should still be stuck | |
129 | self.assertFalse(rproc.finished) | |
130 | ||
131 | # Kill client A | |
132 | self.mount_a.kill() | |
133 | self.mount_a.kill_cleanup() | |
134 | ||
135 | # Client B should complete | |
136 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id]) | |
137 | rproc.wait() | |
138 | ||
139 | def test_client_oldest_tid(self): | |
140 | """ | |
141 | When a client does not advance its oldest tid, the MDS should notice that | |
142 | and generate health warnings. | |
143 | """ | |
144 | ||
145 | # num of requests client issues | |
146 | max_requests = 1000 | |
147 | ||
148 | # The debug hook to inject the failure only exists in the fuse client | |
149 | if not isinstance(self.mount_a, FuseMount): | |
150 | raise SkipTest("Require FUSE client to inject client release failure") | |
151 | ||
152 | self.set_conf('client', 'client inject fixed oldest tid', 'true') | |
153 | self.mount_a.teardown() | |
154 | self.mount_a.mount() | |
155 | self.mount_a.wait_until_mounted() | |
156 | ||
157 | self.fs.mds_asok(['config', 'set', 'mds_max_completed_requests', '{0}'.format(max_requests)]) | |
158 | ||
159 | # Create lots of files | |
160 | self.mount_a.create_n_files("testdir/file1", max_requests + 100) | |
161 | ||
162 | # Create a few files synchronously. This makes sure previous requests are completed | |
163 | self.mount_a.create_n_files("testdir/file2", 5, True) | |
164 | ||
165 | # Wait for the health warnings. Assume mds can handle 10 request per second at least | |
224ce89b | 166 | self.wait_for_health("MDS_CLIENT_OLDEST_TID", max_requests / 10) |
7c673cae FG |
167 | |
168 | def _test_client_cache_size(self, mount_subdir): | |
169 | """ | |
170 | check if client invalidate kernel dcache according to its cache size config | |
171 | """ | |
172 | ||
173 | # The debug hook to inject the failure only exists in the fuse client | |
174 | if not isinstance(self.mount_a, FuseMount): | |
175 | raise SkipTest("Require FUSE client to inject client release failure") | |
176 | ||
177 | if mount_subdir: | |
178 | # fuse assigns a fix inode number (1) to root inode. But in mounting into | |
179 | # subdir case, the actual inode number of root is not 1. This mismatch | |
180 | # confuses fuse_lowlevel_notify_inval_entry() when invalidating dentries | |
181 | # in root directory. | |
182 | self.mount_a.run_shell(["mkdir", "subdir"]) | |
183 | self.mount_a.umount_wait() | |
184 | self.set_conf('client', 'client mountpoint', '/subdir') | |
185 | self.mount_a.mount() | |
186 | self.mount_a.wait_until_mounted() | |
187 | root_ino = self.mount_a.path_to_ino(".") | |
188 | self.assertEqual(root_ino, 1); | |
189 | ||
190 | dir_path = os.path.join(self.mount_a.mountpoint, "testdir") | |
191 | ||
192 | mkdir_script = dedent(""" | |
193 | import os | |
194 | os.mkdir("{path}") | |
195 | for n in range(0, {num_dirs}): | |
196 | os.mkdir("{path}/dir{{0}}".format(n)) | |
197 | """) | |
198 | ||
199 | num_dirs = 1000 | |
200 | self.mount_a.run_python(mkdir_script.format(path=dir_path, num_dirs=num_dirs)) | |
201 | self.mount_a.run_shell(["sync"]) | |
202 | ||
203 | dentry_count, dentry_pinned_count = self.mount_a.get_dentry_count() | |
204 | self.assertGreaterEqual(dentry_count, num_dirs) | |
205 | self.assertGreaterEqual(dentry_pinned_count, num_dirs) | |
206 | ||
207 | cache_size = num_dirs / 10 | |
208 | self.mount_a.set_cache_size(cache_size) | |
209 | ||
210 | def trimmed(): | |
211 | dentry_count, dentry_pinned_count = self.mount_a.get_dentry_count() | |
212 | log.info("waiting, dentry_count, dentry_pinned_count: {0}, {1}".format( | |
213 | dentry_count, dentry_pinned_count | |
214 | )) | |
215 | if dentry_count > cache_size or dentry_pinned_count > cache_size: | |
216 | return False | |
217 | ||
218 | return True | |
219 | ||
220 | self.wait_until_true(trimmed, 30) | |
221 | ||
222 | @needs_trimming | |
223 | def test_client_cache_size(self): | |
224 | self._test_client_cache_size(False) | |
225 | self._test_client_cache_size(True) |