]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/test_sessionmap.py
update sources to v12.1.1
[ceph.git] / ceph / qa / tasks / cephfs / test_sessionmap.py
CommitLineData
7c673cae
FG
1from StringIO import StringIO
2import json
3import logging
4from unittest import SkipTest
5
6from tasks.cephfs.fuse_mount import FuseMount
7from teuthology.exceptions import CommandFailedError
8from tasks.cephfs.cephfs_test_case import CephFSTestCase
9
10log = logging.getLogger(__name__)
11
12
13class TestSessionMap(CephFSTestCase):
14 CLIENTS_REQUIRED = 2
15 MDSS_REQUIRED = 2
16
17 def test_tell_session_drop(self):
18 """
19 That when a `tell` command is sent using the python CLI,
20 its MDS session is gone after it terminates
21 """
22 self.mount_a.umount_wait()
23 self.mount_b.umount_wait()
24
25 mds_id = self.fs.get_lone_mds_id()
26 self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
27
28 ls_data = self.fs.mds_asok(['session', 'ls'])
29 self.assertEqual(len(ls_data), 0)
30
31 def _get_thread_count(self, mds_id):
32 remote = self.fs.mds_daemons[mds_id].remote
33
34 ps_txt = remote.run(
35 args=["ps", "-ww", "axo", "nlwp,cmd"],
36 stdout=StringIO()
37 ).stdout.getvalue().strip()
38 lines = ps_txt.split("\n")[1:]
39
40 for line in lines:
41 if "ceph-mds" in line and not "daemon-helper" in line:
42 if line.find("-i {0}".format(mds_id)) != -1:
43 log.info("Found ps line for daemon: {0}".format(line))
44 return int(line.split()[0])
45
46 raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
47 mds_id, ps_txt
48 ))
49
50 def test_tell_conn_close(self):
51 """
52 That when a `tell` command is sent using the python CLI,
53 the thread count goes back to where it started (i.e. we aren't
54 leaving connections open)
55 """
56 self.mount_a.umount_wait()
57 self.mount_b.umount_wait()
58
59 mds_id = self.fs.get_lone_mds_id()
60
61 initial_thread_count = self._get_thread_count(mds_id)
62 self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
63 final_thread_count = self._get_thread_count(mds_id)
64
65 self.assertEqual(initial_thread_count, final_thread_count)
66
67 def test_mount_conn_close(self):
68 """
69 That when a client unmounts, the thread count on the MDS goes back
70 to what it was before the client mounted
71 """
72 self.mount_a.umount_wait()
73 self.mount_b.umount_wait()
74
75 mds_id = self.fs.get_lone_mds_id()
76
77 initial_thread_count = self._get_thread_count(mds_id)
78 self.mount_a.mount()
79 self.mount_a.wait_until_mounted()
80 self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
81 self.mount_a.umount_wait()
82 final_thread_count = self._get_thread_count(mds_id)
83
84 self.assertEqual(initial_thread_count, final_thread_count)
85
86 def test_version_splitting(self):
87 """
88 That when many sessions are updated, they are correctly
89 split into multiple versions to obey mds_sessionmap_keys_per_op
90 """
91
92 # Start umounted
93 self.mount_a.umount_wait()
94 self.mount_b.umount_wait()
95
96 # Configure MDS to write one OMAP key at once
97 self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1)
98 self.fs.mds_fail_restart()
99 self.fs.wait_for_daemons()
100
101 # I would like two MDSs, so that I can do an export dir later
7c673cae
FG
102 self.fs.set_max_mds(2)
103 self.fs.wait_for_daemons()
104
105 active_mds_names = self.fs.get_active_names()
106 rank_0_id = active_mds_names[0]
107 rank_1_id = active_mds_names[1]
108 log.info("Ranks 0 and 1 are {0} and {1}".format(
109 rank_0_id, rank_1_id))
110
111 # Bring the clients back
112 self.mount_a.mount()
113 self.mount_b.mount()
114 self.mount_a.create_files() # Kick the client into opening sessions
115 self.mount_b.create_files()
116
117 # See that they've got sessions
118 self.assert_session_count(2, mds_id=rank_0_id)
119
120 # See that we persist their sessions
121 self.fs.mds_asok(["flush", "journal"], rank_0_id)
122 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
123 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
124 self.assertEqual(table_json['0']['result'], 0)
125 self.assertEqual(len(table_json['0']['data']['Sessions']), 2)
126
127 # Now, induce a "force_open_sessions" event by exporting a dir
128 self.mount_a.run_shell(["mkdir", "bravo"])
129 self.mount_a.run_shell(["touch", "bravo/file"])
130 self.mount_b.run_shell(["ls", "-l", "bravo/file"])
131
132 def get_omap_wrs():
133 return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
134
135 # Flush so that there are no dirty sessions on rank 1
136 self.fs.mds_asok(["flush", "journal"], rank_1_id)
137
138 # Export so that we get a force_open to rank 1 for the two sessions from rank 0
139 initial_omap_wrs = get_omap_wrs()
140 self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
141
142 # This is the critical (if rather subtle) check: that in the process of doing an export dir,
143 # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
144 # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see
145 # a single session get written out (the first of the two, triggered by the second getting marked
146 # dirty)
147 # The number of writes is two per session, because the header (sessionmap version) update and
148 # KV write both count.
149 self.wait_until_true(
150 lambda: get_omap_wrs() - initial_omap_wrs == 2,
151 timeout=10 # Long enough for an export to get acked
152 )
153
154 # Now end our sessions and check the backing sessionmap is updated correctly
155 self.mount_a.umount_wait()
156 self.mount_b.umount_wait()
157
158 # In-memory sessionmap check
159 self.assert_session_count(0, mds_id=rank_0_id)
160
161 # On-disk sessionmap check
162 self.fs.mds_asok(["flush", "journal"], rank_0_id)
163 table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
164 log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
165 self.assertEqual(table_json['0']['result'], 0)
166 self.assertEqual(len(table_json['0']['data']['Sessions']), 0)
167
168 def _sudo_write_file(self, remote, path, data):
169 """
170 Write data to a remote file as super user
171
172 :param remote: Remote site.
173 :param path: Path on the remote being written to.
174 :param data: Data to be written.
175
176 Both perms and owner are passed directly to chmod.
177 """
178 remote.run(
179 args=[
180 'sudo',
181 'python',
182 '-c',
183 'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
184 path,
185 ],
186 stdin=data,
187 )
188
189 def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None):
190 """
191 Set up auth credentials for a client mount, and write out the keyring
192 for the client to use.
193 """
194
195 if osd_caps is None:
196 osd_caps = "allow rw"
197
198 if mon_caps is None:
199 mon_caps = "allow r"
200
201 out = self.fs.mon_manager.raw_cluster_cmd(
202 "auth", "get-or-create", "client.{name}".format(name=id_name),
203 "mds", mds_caps,
204 "osd", osd_caps,
205 "mon", mon_caps
206 )
207 mount.client_id = id_name
208 self._sudo_write_file(mount.client_remote, mount.get_keyring_path(), out)
209 self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
210
211 def test_session_reject(self):
212 if not isinstance(self.mount_a, FuseMount):
213 raise SkipTest("Requires FUSE client to inject client metadata")
214
215 self.mount_a.run_shell(["mkdir", "foo"])
216 self.mount_a.run_shell(["mkdir", "foo/bar"])
217 self.mount_a.umount_wait()
218
219 # Mount B will be my rejected client
220 self.mount_b.umount_wait()
221
222 # Configure a client that is limited to /foo/bar
223 self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar")
224 # Check he can mount that dir and do IO
225 self.mount_b.mount(mount_path="/foo/bar")
226 self.mount_b.wait_until_mounted()
227 self.mount_b.create_destroy()
228 self.mount_b.umount_wait()
229
230 # Configure the client to claim that its mount point metadata is /baz
231 self.set_conf("client.badguy", "client_metadata", "root=/baz")
232 # Try to mount the client, see that it fails
233 with self.assert_cluster_log("client session with invalid root '/baz' denied"):
234 with self.assertRaises(CommandFailedError):
235 self.mount_b.mount(mount_path="/foo/bar")