]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | from StringIO import StringIO |
2 | import json | |
3 | import logging | |
4 | from unittest import SkipTest | |
5 | ||
6 | from tasks.cephfs.fuse_mount import FuseMount | |
7 | from teuthology.exceptions import CommandFailedError | |
8 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
9 | ||
10 | log = logging.getLogger(__name__) | |
11 | ||
12 | ||
13 | class TestSessionMap(CephFSTestCase): | |
14 | CLIENTS_REQUIRED = 2 | |
15 | MDSS_REQUIRED = 2 | |
16 | ||
17 | def test_tell_session_drop(self): | |
18 | """ | |
19 | That when a `tell` command is sent using the python CLI, | |
20 | its MDS session is gone after it terminates | |
21 | """ | |
22 | self.mount_a.umount_wait() | |
23 | self.mount_b.umount_wait() | |
24 | ||
25 | mds_id = self.fs.get_lone_mds_id() | |
26 | self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls") | |
27 | ||
28 | ls_data = self.fs.mds_asok(['session', 'ls']) | |
29 | self.assertEqual(len(ls_data), 0) | |
30 | ||
31 | def _get_thread_count(self, mds_id): | |
32 | remote = self.fs.mds_daemons[mds_id].remote | |
33 | ||
34 | ps_txt = remote.run( | |
35 | args=["ps", "-ww", "axo", "nlwp,cmd"], | |
36 | stdout=StringIO() | |
37 | ).stdout.getvalue().strip() | |
38 | lines = ps_txt.split("\n")[1:] | |
39 | ||
40 | for line in lines: | |
41 | if "ceph-mds" in line and not "daemon-helper" in line: | |
42 | if line.find("-i {0}".format(mds_id)) != -1: | |
43 | log.info("Found ps line for daemon: {0}".format(line)) | |
44 | return int(line.split()[0]) | |
45 | ||
46 | raise RuntimeError("No process found in ps output for MDS {0}: {1}".format( | |
47 | mds_id, ps_txt | |
48 | )) | |
49 | ||
50 | def test_tell_conn_close(self): | |
51 | """ | |
52 | That when a `tell` command is sent using the python CLI, | |
53 | the thread count goes back to where it started (i.e. we aren't | |
54 | leaving connections open) | |
55 | """ | |
56 | self.mount_a.umount_wait() | |
57 | self.mount_b.umount_wait() | |
58 | ||
59 | mds_id = self.fs.get_lone_mds_id() | |
60 | ||
61 | initial_thread_count = self._get_thread_count(mds_id) | |
62 | self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls") | |
63 | final_thread_count = self._get_thread_count(mds_id) | |
64 | ||
65 | self.assertEqual(initial_thread_count, final_thread_count) | |
66 | ||
67 | def test_mount_conn_close(self): | |
68 | """ | |
69 | That when a client unmounts, the thread count on the MDS goes back | |
70 | to what it was before the client mounted | |
71 | """ | |
72 | self.mount_a.umount_wait() | |
73 | self.mount_b.umount_wait() | |
74 | ||
75 | mds_id = self.fs.get_lone_mds_id() | |
76 | ||
77 | initial_thread_count = self._get_thread_count(mds_id) | |
78 | self.mount_a.mount() | |
79 | self.mount_a.wait_until_mounted() | |
80 | self.assertGreater(self._get_thread_count(mds_id), initial_thread_count) | |
81 | self.mount_a.umount_wait() | |
82 | final_thread_count = self._get_thread_count(mds_id) | |
83 | ||
84 | self.assertEqual(initial_thread_count, final_thread_count) | |
85 | ||
86 | def test_version_splitting(self): | |
87 | """ | |
88 | That when many sessions are updated, they are correctly | |
89 | split into multiple versions to obey mds_sessionmap_keys_per_op | |
90 | """ | |
91 | ||
92 | # Start umounted | |
93 | self.mount_a.umount_wait() | |
94 | self.mount_b.umount_wait() | |
95 | ||
96 | # Configure MDS to write one OMAP key at once | |
97 | self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1) | |
98 | self.fs.mds_fail_restart() | |
99 | self.fs.wait_for_daemons() | |
100 | ||
101 | # I would like two MDSs, so that I can do an export dir later | |
7c673cae FG |
102 | self.fs.set_max_mds(2) |
103 | self.fs.wait_for_daemons() | |
104 | ||
105 | active_mds_names = self.fs.get_active_names() | |
106 | rank_0_id = active_mds_names[0] | |
107 | rank_1_id = active_mds_names[1] | |
108 | log.info("Ranks 0 and 1 are {0} and {1}".format( | |
109 | rank_0_id, rank_1_id)) | |
110 | ||
111 | # Bring the clients back | |
112 | self.mount_a.mount() | |
113 | self.mount_b.mount() | |
114 | self.mount_a.create_files() # Kick the client into opening sessions | |
115 | self.mount_b.create_files() | |
116 | ||
117 | # See that they've got sessions | |
118 | self.assert_session_count(2, mds_id=rank_0_id) | |
119 | ||
120 | # See that we persist their sessions | |
121 | self.fs.mds_asok(["flush", "journal"], rank_0_id) | |
122 | table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) | |
123 | log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) | |
124 | self.assertEqual(table_json['0']['result'], 0) | |
125 | self.assertEqual(len(table_json['0']['data']['Sessions']), 2) | |
126 | ||
127 | # Now, induce a "force_open_sessions" event by exporting a dir | |
128 | self.mount_a.run_shell(["mkdir", "bravo"]) | |
129 | self.mount_a.run_shell(["touch", "bravo/file"]) | |
130 | self.mount_b.run_shell(["ls", "-l", "bravo/file"]) | |
131 | ||
132 | def get_omap_wrs(): | |
133 | return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr'] | |
134 | ||
135 | # Flush so that there are no dirty sessions on rank 1 | |
136 | self.fs.mds_asok(["flush", "journal"], rank_1_id) | |
137 | ||
138 | # Export so that we get a force_open to rank 1 for the two sessions from rank 0 | |
139 | initial_omap_wrs = get_omap_wrs() | |
140 | self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id) | |
141 | ||
142 | # This is the critical (if rather subtle) check: that in the process of doing an export dir, | |
143 | # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There | |
144 | # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see | |
145 | # a single session get written out (the first of the two, triggered by the second getting marked | |
146 | # dirty) | |
147 | # The number of writes is two per session, because the header (sessionmap version) update and | |
11fdf7f2 | 148 | # KV write both count. Also, multiply by 2 for each openfile table update. |
7c673cae | 149 | self.wait_until_true( |
11fdf7f2 TL |
150 | lambda: get_omap_wrs() - initial_omap_wrs == 2*2, |
151 | timeout=30 # Long enough for an export to get acked | |
7c673cae FG |
152 | ) |
153 | ||
154 | # Now end our sessions and check the backing sessionmap is updated correctly | |
155 | self.mount_a.umount_wait() | |
156 | self.mount_b.umount_wait() | |
157 | ||
158 | # In-memory sessionmap check | |
159 | self.assert_session_count(0, mds_id=rank_0_id) | |
160 | ||
161 | # On-disk sessionmap check | |
162 | self.fs.mds_asok(["flush", "journal"], rank_0_id) | |
163 | table_json = json.loads(self.fs.table_tool(["0", "show", "session"])) | |
164 | log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2))) | |
165 | self.assertEqual(table_json['0']['result'], 0) | |
166 | self.assertEqual(len(table_json['0']['data']['Sessions']), 0) | |
167 | ||
168 | def _sudo_write_file(self, remote, path, data): | |
169 | """ | |
170 | Write data to a remote file as super user | |
171 | ||
172 | :param remote: Remote site. | |
173 | :param path: Path on the remote being written to. | |
174 | :param data: Data to be written. | |
175 | ||
176 | Both perms and owner are passed directly to chmod. | |
177 | """ | |
178 | remote.run( | |
179 | args=[ | |
180 | 'sudo', | |
181 | 'python', | |
182 | '-c', | |
183 | 'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))', | |
184 | path, | |
185 | ], | |
186 | stdin=data, | |
187 | ) | |
188 | ||
189 | def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None): | |
190 | """ | |
191 | Set up auth credentials for a client mount, and write out the keyring | |
192 | for the client to use. | |
193 | """ | |
194 | ||
195 | if osd_caps is None: | |
196 | osd_caps = "allow rw" | |
197 | ||
198 | if mon_caps is None: | |
199 | mon_caps = "allow r" | |
200 | ||
201 | out = self.fs.mon_manager.raw_cluster_cmd( | |
202 | "auth", "get-or-create", "client.{name}".format(name=id_name), | |
203 | "mds", mds_caps, | |
204 | "osd", osd_caps, | |
205 | "mon", mon_caps | |
206 | ) | |
207 | mount.client_id = id_name | |
208 | self._sudo_write_file(mount.client_remote, mount.get_keyring_path(), out) | |
209 | self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path()) | |
210 | ||
211 | def test_session_reject(self): | |
212 | if not isinstance(self.mount_a, FuseMount): | |
213 | raise SkipTest("Requires FUSE client to inject client metadata") | |
214 | ||
215 | self.mount_a.run_shell(["mkdir", "foo"]) | |
216 | self.mount_a.run_shell(["mkdir", "foo/bar"]) | |
217 | self.mount_a.umount_wait() | |
218 | ||
219 | # Mount B will be my rejected client | |
220 | self.mount_b.umount_wait() | |
221 | ||
222 | # Configure a client that is limited to /foo/bar | |
223 | self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar") | |
224 | # Check he can mount that dir and do IO | |
225 | self.mount_b.mount(mount_path="/foo/bar") | |
226 | self.mount_b.wait_until_mounted() | |
227 | self.mount_b.create_destroy() | |
228 | self.mount_b.umount_wait() | |
229 | ||
230 | # Configure the client to claim that its mount point metadata is /baz | |
231 | self.set_conf("client.badguy", "client_metadata", "root=/baz") | |
232 | # Try to mount the client, see that it fails | |
11fdf7f2 | 233 | with self.assert_cluster_log("client session with non-allowable root '/baz' denied"): |
7c673cae FG |
234 | with self.assertRaises(CommandFailedError): |
235 | self.mount_b.mount(mount_path="/foo/bar") |