]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_sessionmap.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / cephfs / test_sessionmap.py
1 import time
2 import json
3 import logging
4
5 from tasks.cephfs.fuse_mount import FuseMount
6 from teuthology.exceptions import CommandFailedError
7 from tasks.cephfs.cephfs_test_case import CephFSTestCase
8
9 log = logging.getLogger(__name__)
10
11
12 class TestSessionMap(CephFSTestCase):
13 CLIENTS_REQUIRED = 2
14 MDSS_REQUIRED = 2
15
16 def test_tell_session_drop(self):
17 """
18 That when a `tell` command is sent using the python CLI,
19 its MDS session is gone after it terminates
20 """
21 self.mount_a.umount_wait()
22 self.mount_b.umount_wait()
23
24 status = self.fs.status()
25 self.fs.rank_tell(["session", "ls"], status=status)
26
27 ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
28 self.assertEqual(len(ls_data), 0)
29
30 def _get_connection_count(self, status=None):
31 perf = self.fs.rank_asok(["perf", "dump"], status=status)
32 conn = 0
33 for module, dump in perf.items():
34 if "AsyncMessenger::Worker" in module:
35 conn += dump['msgr_active_connections']
36 return conn
37
38 def test_tell_conn_close(self):
39 """
40 That when a `tell` command is sent using the python CLI,
41 the conn count goes back to where it started (i.e. we aren't
42 leaving connections open)
43 """
44 self.mount_a.umount_wait()
45 self.mount_b.umount_wait()
46
47 status = self.fs.status()
48 s = self._get_connection_count(status=status)
49 self.fs.rank_tell(["session", "ls"], status=status)
50 e = self._get_connection_count(status=status)
51
52 self.assertEqual(s, e)
53
54 def test_mount_conn_close(self):
55 """
56 That when a client unmounts, the thread count on the MDS goes back
57 to what it was before the client mounted
58 """
59 self.mount_a.umount_wait()
60 self.mount_b.umount_wait()
61
62 status = self.fs.status()
63 s = self._get_connection_count(status=status)
64 self.mount_a.mount_wait()
65 self.assertGreater(self._get_connection_count(status=status), s)
66 self.mount_a.umount_wait()
67 e = self._get_connection_count(status=status)
68
69 self.assertEqual(s, e)
70
71 def test_version_splitting(self):
72 """
73 That when many sessions are updated, they are correctly
74 split into multiple versions to obey mds_sessionmap_keys_per_op
75 """
76
77 self.mount_a.umount_wait()
78 self.mount_b.umount_wait()
79
80 # Configure MDS to write one OMAP key at once
81 self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1)
82 self.fs.mds_fail_restart()
83 status = self.fs.wait_for_daemons()
84
85 # Bring the clients back
86 self.mount_a.mount_wait()
87 self.mount_b.mount_wait()
88
89 # See that they've got sessions
90 self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
91
92 # See that we persist their sessions
93 self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
94 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
95 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
96 self.assertEqual(table_json['0']['result'], 0)
97 self.assertEqual(len(table_json['0']['data']['sessions']), 2)
98
99 # Now, induce a "force_open_sessions" event by exporting a dir
100 self.mount_a.run_shell(["mkdir", "bravo"])
101 self.mount_a.run_shell(["touch", "bravo/file_a"])
102 self.mount_b.run_shell(["touch", "bravo/file_b"])
103
104 self.fs.set_max_mds(2)
105 status = self.fs.wait_for_daemons()
106
107 def get_omap_wrs():
108 return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
109
110 # Flush so that there are no dirty sessions on rank 1
111 self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
112
113 # Export so that we get a force_open to rank 1 for the two sessions from rank 0
114 initial_omap_wrs = get_omap_wrs()
115 self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
116
117 # This is the critical (if rather subtle) check: that in the process of doing an export dir,
118 # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
119 # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see
120 # a single session get written out (the first of the two, triggered by the second getting marked
121 # dirty)
122 # The number of writes is two per session, because the header (sessionmap version) update and
123 # KV write both count. Also, multiply by 2 for each openfile table update.
124 self.wait_until_true(
125 lambda: get_omap_wrs() - initial_omap_wrs == 2*2,
126 timeout=30 # Long enough for an export to get acked
127 )
128
129 # Now end our sessions and check the backing sessionmap is updated correctly
130 self.mount_a.umount_wait()
131 self.mount_b.umount_wait()
132
133 # In-memory sessionmap check
134 self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
135
136 # On-disk sessionmap check
137 self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
138 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
139 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
140 self.assertEqual(table_json['0']['result'], 0)
141 self.assertEqual(len(table_json['0']['data']['sessions']), 0)
142
143 def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None):
144 """
145 Set up auth credentials for a client mount, and write out the keyring
146 for the client to use.
147 """
148
149 if osd_caps is None:
150 osd_caps = "allow rw"
151
152 if mon_caps is None:
153 mon_caps = "allow r"
154
155 out = self.fs.mon_manager.raw_cluster_cmd(
156 "auth", "get-or-create", "client.{name}".format(name=id_name),
157 "mds", mds_caps,
158 "osd", osd_caps,
159 "mon", mon_caps
160 )
161 mount.client_id = id_name
162 mount.client_remote.write_file(mount.get_keyring_path(), out, sudo=True)
163 self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
164
165 def test_session_reject(self):
166 if not isinstance(self.mount_a, FuseMount):
167 self.skipTest("Requires FUSE client to inject client metadata")
168
169 self.mount_a.run_shell(["mkdir", "foo"])
170 self.mount_a.run_shell(["mkdir", "foo/bar"])
171 self.mount_a.umount_wait()
172
173 # Mount B will be my rejected client
174 self.mount_b.umount_wait()
175
176 # Configure a client that is limited to /foo/bar
177 self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar")
178 # Check he can mount that dir and do IO
179 self.mount_b.mount_wait(cephfs_mntpt="/foo/bar")
180 self.mount_b.create_destroy()
181 self.mount_b.umount_wait()
182
183 # Configure the client to claim that its mount point metadata is /baz
184 self.set_conf("client.badguy", "client_metadata", "root=/baz")
185 # Try to mount the client, see that it fails
186 with self.assert_cluster_log("client session with non-allowable root '/baz' denied"):
187 with self.assertRaises(CommandFailedError):
188 self.mount_b.mount_wait(cephfs_mntpt="/foo/bar")
189
190 def test_session_evict_blocklisted(self):
191 """
192 Check that mds evicts blocklisted client
193 """
194 if not isinstance(self.mount_a, FuseMount):
195 self.skipTest("Requires FUSE client to use "
196 "mds_cluster.is_addr_blocklisted()")
197
198 self.fs.set_max_mds(2)
199 status = self.fs.wait_for_daemons()
200
201 self.mount_a.run_shell_payload("mkdir {d0,d1} && touch {d0,d1}/file")
202 self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
203 self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
204 self._wait_subtrees([('/d0', 0), ('/d1', 1)], status=status)
205
206 self.mount_a.run_shell(["touch", "d0/f0"])
207 self.mount_a.run_shell(["touch", "d1/f0"])
208 self.mount_b.run_shell(["touch", "d0/f1"])
209 self.mount_b.run_shell(["touch", "d1/f1"])
210
211 self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
212 self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
213
214 mount_a_client_id = self.mount_a.get_global_id()
215 self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id],
216 mds_id=self.fs.get_rank(rank=0, status=status)['name'])
217 self.wait_until_true(lambda: self.mds_cluster.is_addr_blocklisted(
218 self.mount_a.get_global_addr()), timeout=30)
219
220 # 10 seconds should be enough for evicting client
221 time.sleep(10)
222 self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
223 self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
224
225 self.mount_a.kill_cleanup()
226 self.mount_a.mount_wait()