]>
Commit | Line | Data |
---|---|---|
81eedcae | 1 | import time |
7c673cae FG |
2 | import json |
3 | import logging | |
7c673cae FG |
4 | |
5 | from tasks.cephfs.fuse_mount import FuseMount | |
6 | from teuthology.exceptions import CommandFailedError | |
7 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
8 | ||
9 | log = logging.getLogger(__name__) | |
10 | ||
11 | ||
12 | class TestSessionMap(CephFSTestCase): | |
13 | CLIENTS_REQUIRED = 2 | |
14 | MDSS_REQUIRED = 2 | |
15 | ||
16 | def test_tell_session_drop(self): | |
17 | """ | |
18 | That when a `tell` command is sent using the python CLI, | |
19 | its MDS session is gone after it terminates | |
20 | """ | |
21 | self.mount_a.umount_wait() | |
22 | self.mount_b.umount_wait() | |
23 | ||
81eedcae TL |
24 | status = self.fs.status() |
25 | self.fs.rank_tell(["session", "ls"], status=status) | |
7c673cae | 26 | |
81eedcae | 27 | ls_data = self.fs.rank_asok(['session', 'ls'], status=status) |
7c673cae FG |
28 | self.assertEqual(len(ls_data), 0) |
29 | ||
81eedcae TL |
30 | def _get_connection_count(self, status=None): |
31 | perf = self.fs.rank_asok(["perf", "dump"], status=status) | |
32 | conn = 0 | |
9f95a23c | 33 | for module, dump in perf.items(): |
81eedcae TL |
34 | if "AsyncMessenger::Worker" in module: |
35 | conn += dump['msgr_active_connections'] | |
36 | return conn | |
7c673cae FG |
37 | |
38 | def test_tell_conn_close(self): | |
39 | """ | |
40 | That when a `tell` command is sent using the python CLI, | |
81eedcae | 41 | the conn count goes back to where it started (i.e. we aren't |
7c673cae FG |
42 | leaving connections open) |
43 | """ | |
522d829b TL |
44 | self.config_set('mds', 'ms_async_reap_threshold', '1') |
45 | ||
7c673cae FG |
46 | self.mount_a.umount_wait() |
47 | self.mount_b.umount_wait() | |
48 | ||
81eedcae TL |
49 | status = self.fs.status() |
50 | s = self._get_connection_count(status=status) | |
51 | self.fs.rank_tell(["session", "ls"], status=status) | |
522d829b TL |
52 | self.wait_until_true( |
53 | lambda: self._get_connection_count(status=status) == s, | |
54 | timeout=30 | |
55 | ) | |
7c673cae FG |
56 | |
57 | def test_mount_conn_close(self): | |
58 | """ | |
59 | That when a client unmounts, the thread count on the MDS goes back | |
60 | to what it was before the client mounted | |
61 | """ | |
522d829b TL |
62 | self.config_set('mds', 'ms_async_reap_threshold', '1') |
63 | ||
7c673cae FG |
64 | self.mount_a.umount_wait() |
65 | self.mount_b.umount_wait() | |
66 | ||
81eedcae TL |
67 | status = self.fs.status() |
68 | s = self._get_connection_count(status=status) | |
e306af50 | 69 | self.mount_a.mount_wait() |
81eedcae | 70 | self.assertGreater(self._get_connection_count(status=status), s) |
7c673cae | 71 | self.mount_a.umount_wait() |
522d829b TL |
72 | self.wait_until_true( |
73 | lambda: self._get_connection_count(status=status) == s, | |
74 | timeout=30 | |
75 | ) | |
7c673cae FG |
76 | |
77 | def test_version_splitting(self): | |
78 | """ | |
79 | That when many sessions are updated, they are correctly | |
80 | split into multiple versions to obey mds_sessionmap_keys_per_op | |
81 | """ | |
82 | ||
7c673cae FG |
83 | self.mount_a.umount_wait() |
84 | self.mount_b.umount_wait() | |
85 | ||
86 | # Configure MDS to write one OMAP key at once | |
87 | self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1) | |
88 | self.fs.mds_fail_restart() | |
9f95a23c | 89 | status = self.fs.wait_for_daemons() |
7c673cae FG |
90 | |
91 | # Bring the clients back | |
e306af50 TL |
92 | self.mount_a.mount_wait() |
93 | self.mount_b.mount_wait() | |
7c673cae FG |
94 | |
95 | # See that they've got sessions | |
81eedcae | 96 | self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name']) |
7c673cae FG |
97 | |
98 | # See that we persist their sessions | |
81eedcae | 99 | self.fs.rank_asok(["flush", "journal"], rank=0, status=status) |
7c673cae FG |
100 | table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) |
101 | log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) | |
102 | self.assertEqual(table_json['0']['result'], 0) | |
92f5a8d4 | 103 | self.assertEqual(len(table_json['0']['data']['sessions']), 2) |
7c673cae FG |
104 | |
105 | # Now, induce a "force_open_sessions" event by exporting a dir | |
106 | self.mount_a.run_shell(["mkdir", "bravo"]) | |
9f95a23c TL |
107 | self.mount_a.run_shell(["touch", "bravo/file_a"]) |
108 | self.mount_b.run_shell(["touch", "bravo/file_b"]) | |
109 | ||
110 | self.fs.set_max_mds(2) | |
111 | status = self.fs.wait_for_daemons() | |
7c673cae FG |
112 | |
113 | def get_omap_wrs(): | |
81eedcae | 114 | return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr'] |
7c673cae FG |
115 | |
116 | # Flush so that there are no dirty sessions on rank 1 | |
81eedcae | 117 | self.fs.rank_asok(["flush", "journal"], rank=1, status=status) |
7c673cae FG |
118 | |
119 | # Export so that we get a force_open to rank 1 for the two sessions from rank 0 | |
120 | initial_omap_wrs = get_omap_wrs() | |
81eedcae | 121 | self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status) |
7c673cae FG |
122 | |
123 | # This is the critical (if rather subtle) check: that in the process of doing an export dir, | |
124 | # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There | |
125 | # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see | |
126 | # a single session get written out (the first of the two, triggered by the second getting marked | |
127 | # dirty) | |
128 | # The number of writes is two per session, because the header (sessionmap version) update and | |
11fdf7f2 | 129 | # KV write both count. Also, multiply by 2 for each openfile table update. |
7c673cae | 130 | self.wait_until_true( |
11fdf7f2 TL |
131 | lambda: get_omap_wrs() - initial_omap_wrs == 2*2, |
132 | timeout=30 # Long enough for an export to get acked | |
7c673cae FG |
133 | ) |
134 | ||
135 | # Now end our sessions and check the backing sessionmap is updated correctly | |
136 | self.mount_a.umount_wait() | |
137 | self.mount_b.umount_wait() | |
138 | ||
139 | # In-memory sessionmap check | |
81eedcae | 140 | self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name']) |
7c673cae FG |
141 | |
142 | # On-disk sessionmap check | |
81eedcae | 143 | self.fs.rank_asok(["flush", "journal"], rank=0, status=status) |
7c673cae FG |
144 | table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) |
145 | log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) | |
146 | self.assertEqual(table_json['0']['result'], 0) | |
92f5a8d4 | 147 | self.assertEqual(len(table_json['0']['data']['sessions']), 0) |
7c673cae | 148 | |
7c673cae FG |
149 | def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None): |
150 | """ | |
151 | Set up auth credentials for a client mount, and write out the keyring | |
152 | for the client to use. | |
153 | """ | |
154 | ||
155 | if osd_caps is None: | |
156 | osd_caps = "allow rw" | |
157 | ||
158 | if mon_caps is None: | |
159 | mon_caps = "allow r" | |
160 | ||
161 | out = self.fs.mon_manager.raw_cluster_cmd( | |
162 | "auth", "get-or-create", "client.{name}".format(name=id_name), | |
163 | "mds", mds_caps, | |
164 | "osd", osd_caps, | |
165 | "mon", mon_caps | |
166 | ) | |
167 | mount.client_id = id_name | |
f67539c2 | 168 | mount.client_remote.write_file(mount.get_keyring_path(), out, sudo=True) |
7c673cae FG |
169 | self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path()) |
170 | ||
171 | def test_session_reject(self): | |
172 | if not isinstance(self.mount_a, FuseMount): | |
9f95a23c | 173 | self.skipTest("Requires FUSE client to inject client metadata") |
7c673cae FG |
174 | |
175 | self.mount_a.run_shell(["mkdir", "foo"]) | |
176 | self.mount_a.run_shell(["mkdir", "foo/bar"]) | |
177 | self.mount_a.umount_wait() | |
178 | ||
179 | # Mount B will be my rejected client | |
180 | self.mount_b.umount_wait() | |
181 | ||
182 | # Configure a client that is limited to /foo/bar | |
183 | self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar") | |
184 | # Check he can mount that dir and do IO | |
f67539c2 | 185 | self.mount_b.mount_wait(cephfs_mntpt="/foo/bar") |
7c673cae FG |
186 | self.mount_b.create_destroy() |
187 | self.mount_b.umount_wait() | |
188 | ||
189 | # Configure the client to claim that its mount point metadata is /baz | |
190 | self.set_conf("client.badguy", "client_metadata", "root=/baz") | |
191 | # Try to mount the client, see that it fails | |
11fdf7f2 | 192 | with self.assert_cluster_log("client session with non-allowable root '/baz' denied"): |
7c673cae | 193 | with self.assertRaises(CommandFailedError): |
f67539c2 | 194 | self.mount_b.mount_wait(cephfs_mntpt="/foo/bar") |
81eedcae | 195 | |
f67539c2 | 196 | def test_session_evict_blocklisted(self): |
81eedcae | 197 | """ |
f67539c2 | 198 | Check that mds evicts blocklisted client |
81eedcae TL |
199 | """ |
200 | if not isinstance(self.mount_a, FuseMount): | |
f67539c2 TL |
201 | self.skipTest("Requires FUSE client to use " |
202 | "mds_cluster.is_addr_blocklisted()") | |
81eedcae TL |
203 | |
204 | self.fs.set_max_mds(2) | |
f6b5b4d7 | 205 | status = self.fs.wait_for_daemons() |
81eedcae | 206 | |
f6b5b4d7 | 207 | self.mount_a.run_shell_payload("mkdir {d0,d1} && touch {d0,d1}/file") |
81eedcae TL |
208 | self.mount_a.setfattr("d0", "ceph.dir.pin", "0") |
209 | self.mount_a.setfattr("d1", "ceph.dir.pin", "1") | |
f6b5b4d7 | 210 | self._wait_subtrees([('/d0', 0), ('/d1', 1)], status=status) |
81eedcae TL |
211 | |
212 | self.mount_a.run_shell(["touch", "d0/f0"]) | |
213 | self.mount_a.run_shell(["touch", "d1/f0"]) | |
214 | self.mount_b.run_shell(["touch", "d0/f1"]) | |
215 | self.mount_b.run_shell(["touch", "d1/f1"]) | |
216 | ||
217 | self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name']) | |
218 | self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name']) | |
219 | ||
220 | mount_a_client_id = self.mount_a.get_global_id() | |
221 | self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id], | |
222 | mds_id=self.fs.get_rank(rank=0, status=status)['name']) | |
f67539c2 TL |
223 | self.wait_until_true(lambda: self.mds_cluster.is_addr_blocklisted( |
224 | self.mount_a.get_global_addr()), timeout=30) | |
81eedcae TL |
225 | |
226 | # 10 seconds should be enough for evicting client | |
227 | time.sleep(10) | |
228 | self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name']) | |
229 | self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name']) | |
230 | ||
231 | self.mount_a.kill_cleanup() | |
e306af50 | 232 | self.mount_a.mount_wait() |