]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_sessionmap.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_sessionmap.py
1 import time
2 import json
3 import logging
4
5 from tasks.cephfs.fuse_mount import FuseMount
6 from teuthology.exceptions import CommandFailedError
7 from tasks.cephfs.cephfs_test_case import CephFSTestCase
8
9 log = logging.getLogger(__name__)
10
11
12 class TestSessionMap(CephFSTestCase):
13 CLIENTS_REQUIRED = 2
14 MDSS_REQUIRED = 2
15
16 def test_tell_session_drop(self):
17 """
18 That when a `tell` command is sent using the python CLI,
19 its MDS session is gone after it terminates
20 """
21 self.mount_a.umount_wait()
22 self.mount_b.umount_wait()
23
24 status = self.fs.status()
25 self.fs.rank_tell(["session", "ls"], status=status)
26
27 ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
28 self.assertEqual(len(ls_data), 0)
29
30 def _get_connection_count(self, status=None):
31 perf = self.fs.rank_asok(["perf", "dump"], status=status)
32 conn = 0
33 for module, dump in perf.items():
34 if "AsyncMessenger::Worker" in module:
35 conn += dump['msgr_active_connections']
36 return conn
37
38 def test_tell_conn_close(self):
39 """
40 That when a `tell` command is sent using the python CLI,
41 the conn count goes back to where it started (i.e. we aren't
42 leaving connections open)
43 """
44 self.config_set('mds', 'ms_async_reap_threshold', '1')
45
46 self.mount_a.umount_wait()
47 self.mount_b.umount_wait()
48
49 status = self.fs.status()
50 s = self._get_connection_count(status=status)
51 self.fs.rank_tell(["session", "ls"], status=status)
52 self.wait_until_true(
53 lambda: self._get_connection_count(status=status) == s,
54 timeout=30
55 )
56
57 def test_mount_conn_close(self):
58 """
59 That when a client unmounts, the thread count on the MDS goes back
60 to what it was before the client mounted
61 """
62 self.config_set('mds', 'ms_async_reap_threshold', '1')
63
64 self.mount_a.umount_wait()
65 self.mount_b.umount_wait()
66
67 status = self.fs.status()
68 s = self._get_connection_count(status=status)
69 self.mount_a.mount_wait()
70 self.assertGreater(self._get_connection_count(status=status), s)
71 self.mount_a.umount_wait()
72 self.wait_until_true(
73 lambda: self._get_connection_count(status=status) == s,
74 timeout=30
75 )
76
77 def test_version_splitting(self):
78 """
79 That when many sessions are updated, they are correctly
80 split into multiple versions to obey mds_sessionmap_keys_per_op
81 """
82
83 self.mount_a.umount_wait()
84 self.mount_b.umount_wait()
85
86 # Configure MDS to write one OMAP key at once
87 self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1)
88 self.fs.mds_fail_restart()
89 status = self.fs.wait_for_daemons()
90
91 # Bring the clients back
92 self.mount_a.mount_wait()
93 self.mount_b.mount_wait()
94
95 # See that they've got sessions
96 self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
97
98 # See that we persist their sessions
99 self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
100 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
101 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
102 self.assertEqual(table_json['0']['result'], 0)
103 self.assertEqual(len(table_json['0']['data']['sessions']), 2)
104
105 # Now, induce a "force_open_sessions" event by exporting a dir
106 self.mount_a.run_shell(["mkdir", "bravo"])
107 self.mount_a.run_shell(["touch", "bravo/file_a"])
108 self.mount_b.run_shell(["touch", "bravo/file_b"])
109
110 self.fs.set_max_mds(2)
111 status = self.fs.wait_for_daemons()
112
113 def get_omap_wrs():
114 return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
115
116 # Flush so that there are no dirty sessions on rank 1
117 self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
118
119 # Export so that we get a force_open to rank 1 for the two sessions from rank 0
120 initial_omap_wrs = get_omap_wrs()
121 self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
122
123 # This is the critical (if rather subtle) check: that in the process of doing an export dir,
124 # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
125 # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see
126 # a single session get written out (the first of the two, triggered by the second getting marked
127 # dirty)
128 # The number of writes is two per session, because the header (sessionmap version) update and
129 # KV write both count. Also, multiply by 2 for each openfile table update.
130 self.wait_until_true(
131 lambda: get_omap_wrs() - initial_omap_wrs == 2*2,
132 timeout=30 # Long enough for an export to get acked
133 )
134
135 # Now end our sessions and check the backing sessionmap is updated correctly
136 self.mount_a.umount_wait()
137 self.mount_b.umount_wait()
138
139 # In-memory sessionmap check
140 self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
141
142 # On-disk sessionmap check
143 self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
144 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
145 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
146 self.assertEqual(table_json['0']['result'], 0)
147 self.assertEqual(len(table_json['0']['data']['sessions']), 0)
148
149 def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None):
150 """
151 Set up auth credentials for a client mount, and write out the keyring
152 for the client to use.
153 """
154
155 if osd_caps is None:
156 osd_caps = "allow rw"
157
158 if mon_caps is None:
159 mon_caps = "allow r"
160
161 out = self.fs.mon_manager.raw_cluster_cmd(
162 "auth", "get-or-create", "client.{name}".format(name=id_name),
163 "mds", mds_caps,
164 "osd", osd_caps,
165 "mon", mon_caps
166 )
167 mount.client_id = id_name
168 mount.client_remote.write_file(mount.get_keyring_path(), out, sudo=True)
169 self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
170
171 def test_session_reject(self):
172 if not isinstance(self.mount_a, FuseMount):
173 self.skipTest("Requires FUSE client to inject client metadata")
174
175 self.mount_a.run_shell(["mkdir", "foo"])
176 self.mount_a.run_shell(["mkdir", "foo/bar"])
177 self.mount_a.umount_wait()
178
179 # Mount B will be my rejected client
180 self.mount_b.umount_wait()
181
182 # Configure a client that is limited to /foo/bar
183 self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar")
184 # Check he can mount that dir and do IO
185 self.mount_b.mount_wait(cephfs_mntpt="/foo/bar")
186 self.mount_b.create_destroy()
187 self.mount_b.umount_wait()
188
189 # Configure the client to claim that its mount point metadata is /baz
190 self.set_conf("client.badguy", "client_metadata", "root=/baz")
191 # Try to mount the client, see that it fails
192 with self.assert_cluster_log("client session with non-allowable root '/baz' denied"):
193 with self.assertRaises(CommandFailedError):
194 self.mount_b.mount_wait(cephfs_mntpt="/foo/bar")
195
196 def test_session_evict_blocklisted(self):
197 """
198 Check that mds evicts blocklisted client
199 """
200 if not isinstance(self.mount_a, FuseMount):
201 self.skipTest("Requires FUSE client to use "
202 "mds_cluster.is_addr_blocklisted()")
203
204 self.fs.set_max_mds(2)
205 status = self.fs.wait_for_daemons()
206
207 self.mount_a.run_shell_payload("mkdir {d0,d1} && touch {d0,d1}/file")
208 self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
209 self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
210 self._wait_subtrees([('/d0', 0), ('/d1', 1)], status=status)
211
212 self.mount_a.run_shell(["touch", "d0/f0"])
213 self.mount_a.run_shell(["touch", "d1/f0"])
214 self.mount_b.run_shell(["touch", "d0/f1"])
215 self.mount_b.run_shell(["touch", "d1/f1"])
216
217 self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
218 self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
219
220 mount_a_client_id = self.mount_a.get_global_id()
221 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id],
222 mds_id=self.fs.get_rank(rank=0, status=status)['name'])
223 self.wait_until_true(lambda: self.mds_cluster.is_addr_blocklisted(
224 self.mount_a.get_global_addr()), timeout=30)
225
226 # 10 seconds should be enough for evicting client
227 time.sleep(10)
228 self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
229 self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
230
231 self.mount_a.kill_cleanup()
232 self.mount_a.mount_wait()