]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_sessionmap.py
1 from StringIO
import StringIO
5 from unittest
import SkipTest
7 from tasks
.cephfs
.fuse_mount
import FuseMount
8 from teuthology
.exceptions
import CommandFailedError
9 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
11 log
= logging
.getLogger(__name__
)
14 class TestSessionMap(CephFSTestCase
):
18 def test_tell_session_drop(self
):
20 That when a `tell` command is sent using the python CLI,
21 its MDS session is gone after it terminates
23 self
.mount_a
.umount_wait()
24 self
.mount_b
.umount_wait()
26 status
= self
.fs
.status()
27 self
.fs
.rank_tell(["session", "ls"], status
=status
)
29 ls_data
= self
.fs
.rank_asok(['session', 'ls'], status
=status
)
30 self
.assertEqual(len(ls_data
), 0)
32 def _get_connection_count(self
, status
=None):
33 perf
= self
.fs
.rank_asok(["perf", "dump"], status
=status
)
35 for module
, dump
in perf
.iteritems():
36 if "AsyncMessenger::Worker" in module
:
37 conn
+= dump
['msgr_active_connections']
40 def test_tell_conn_close(self
):
42 That when a `tell` command is sent using the python CLI,
43 the conn count goes back to where it started (i.e. we aren't
44 leaving connections open)
46 self
.mount_a
.umount_wait()
47 self
.mount_b
.umount_wait()
49 status
= self
.fs
.status()
50 s
= self
._get
_connection
_count
(status
=status
)
51 self
.fs
.rank_tell(["session", "ls"], status
=status
)
52 e
= self
._get
_connection
_count
(status
=status
)
54 self
.assertEqual(s
, e
)
56 def test_mount_conn_close(self
):
58 That when a client unmounts, the thread count on the MDS goes back
59 to what it was before the client mounted
61 self
.mount_a
.umount_wait()
62 self
.mount_b
.umount_wait()
64 status
= self
.fs
.status()
65 s
= self
._get
_connection
_count
(status
=status
)
67 self
.mount_a
.wait_until_mounted()
68 self
.assertGreater(self
._get
_connection
_count
(status
=status
), s
)
69 self
.mount_a
.umount_wait()
70 e
= self
._get
_connection
_count
(status
=status
)
72 self
.assertEqual(s
, e
)
74 def test_version_splitting(self
):
76 That when many sessions are updated, they are correctly
77 split into multiple versions to obey mds_sessionmap_keys_per_op
81 self
.mount_a
.umount_wait()
82 self
.mount_b
.umount_wait()
84 # Configure MDS to write one OMAP key at once
85 self
.set_conf('mds', 'mds_sessionmap_keys_per_op', 1)
86 self
.fs
.mds_fail_restart()
87 self
.fs
.wait_for_daemons()
89 # I would like two MDSs, so that I can do an export dir later
90 self
.fs
.set_max_mds(2)
91 self
.fs
.wait_for_daemons()
93 status
= self
.fs
.status()
95 # Bring the clients back
98 self
.mount_a
.create_files() # Kick the client into opening sessions
99 self
.mount_b
.create_files()
101 # See that they've got sessions
102 self
.assert_session_count(2, mds_id
=self
.fs
.get_rank(status
=status
)['name'])
104 # See that we persist their sessions
105 self
.fs
.rank_asok(["flush", "journal"], rank
=0, status
=status
)
106 table_json
= json
.loads(self
.fs
.table_tool(["0", "show", "session"]))
107 log
.info("SessionMap: {0}".format(json
.dumps(table_json
, indent
=2)))
108 self
.assertEqual(table_json
['0']['result'], 0)
109 self
.assertEqual(len(table_json
['0']['data']['sessions']), 2)
111 # Now, induce a "force_open_sessions" event by exporting a dir
112 self
.mount_a
.run_shell(["mkdir", "bravo"])
113 self
.mount_a
.run_shell(["touch", "bravo/file"])
114 self
.mount_b
.run_shell(["ls", "-l", "bravo/file"])
117 return self
.fs
.rank_asok(['perf', 'dump', 'objecter'], rank
=1, status
=status
)['objecter']['omap_wr']
119 # Flush so that there are no dirty sessions on rank 1
120 self
.fs
.rank_asok(["flush", "journal"], rank
=1, status
=status
)
122 # Export so that we get a force_open to rank 1 for the two sessions from rank 0
123 initial_omap_wrs
= get_omap_wrs()
124 self
.fs
.rank_asok(['export', 'dir', '/bravo', '1'], rank
=0, status
=status
)
126 # This is the critical (if rather subtle) check: that in the process of doing an export dir,
127 # we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
128 # will be two sessions dirtied here, and because we have set keys_per_op to 1, we should see
129 # a single session get written out (the first of the two, triggered by the second getting marked
131 # The number of writes is two per session, because the header (sessionmap version) update and
132 # KV write both count. Also, multiply by 2 for each openfile table update.
133 self
.wait_until_true(
134 lambda: get_omap_wrs() - initial_omap_wrs
== 2*2,
135 timeout
=30 # Long enough for an export to get acked
138 # Now end our sessions and check the backing sessionmap is updated correctly
139 self
.mount_a
.umount_wait()
140 self
.mount_b
.umount_wait()
142 # In-memory sessionmap check
143 self
.assert_session_count(0, mds_id
=self
.fs
.get_rank(status
=status
)['name'])
145 # On-disk sessionmap check
146 self
.fs
.rank_asok(["flush", "journal"], rank
=0, status
=status
)
147 table_json
= json
.loads(self
.fs
.table_tool(["0", "show", "session"]))
148 log
.info("SessionMap: {0}".format(json
.dumps(table_json
, indent
=2)))
149 self
.assertEqual(table_json
['0']['result'], 0)
150 self
.assertEqual(len(table_json
['0']['data']['sessions']), 0)
152 def _sudo_write_file(self
, remote
, path
, data
):
154 Write data to a remote file as super user
156 :param remote: Remote site.
157 :param path: Path on the remote being written to.
158 :param data: Data to be written.
160 Both perms and owner are passed directly to chmod.
167 'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
173 def _configure_auth(self
, mount
, id_name
, mds_caps
, osd_caps
=None, mon_caps
=None):
175 Set up auth credentials for a client mount, and write out the keyring
176 for the client to use.
180 osd_caps
= "allow rw"
185 out
= self
.fs
.mon_manager
.raw_cluster_cmd(
186 "auth", "get-or-create", "client.{name}".format(name
=id_name
),
191 mount
.client_id
= id_name
192 self
._sudo
_write
_file
(mount
.client_remote
, mount
.get_keyring_path(), out
)
193 self
.set_conf("client.{name}".format(name
=id_name
), "keyring", mount
.get_keyring_path())
195 def test_session_reject(self
):
196 if not isinstance(self
.mount_a
, FuseMount
):
197 raise SkipTest("Requires FUSE client to inject client metadata")
199 self
.mount_a
.run_shell(["mkdir", "foo"])
200 self
.mount_a
.run_shell(["mkdir", "foo/bar"])
201 self
.mount_a
.umount_wait()
203 # Mount B will be my rejected client
204 self
.mount_b
.umount_wait()
206 # Configure a client that is limited to /foo/bar
207 self
._configure
_auth
(self
.mount_b
, "badguy", "allow rw path=/foo/bar")
208 # Check he can mount that dir and do IO
209 self
.mount_b
.mount(mount_path
="/foo/bar")
210 self
.mount_b
.wait_until_mounted()
211 self
.mount_b
.create_destroy()
212 self
.mount_b
.umount_wait()
214 # Configure the client to claim that its mount point metadata is /baz
215 self
.set_conf("client.badguy", "client_metadata", "root=/baz")
216 # Try to mount the client, see that it fails
217 with self
.assert_cluster_log("client session with non-allowable root '/baz' denied"):
218 with self
.assertRaises(CommandFailedError
):
219 self
.mount_b
.mount(mount_path
="/foo/bar")
221 def test_session_evict_blacklisted(self
):
223 Check that mds evicts blacklisted client
225 if not isinstance(self
.mount_a
, FuseMount
):
226 self
.skipTest("Requires FUSE client to use is_blacklisted()")
228 self
.fs
.set_max_mds(2)
229 self
.fs
.wait_for_daemons()
230 status
= self
.fs
.status()
232 self
.mount_a
.run_shell(["mkdir", "d0", "d1"])
233 self
.mount_a
.setfattr("d0", "ceph.dir.pin", "0")
234 self
.mount_a
.setfattr("d1", "ceph.dir.pin", "1")
235 self
._wait
_subtrees
(status
, 0, [('/d0', 0), ('/d1', 1)])
237 self
.mount_a
.run_shell(["touch", "d0/f0"])
238 self
.mount_a
.run_shell(["touch", "d1/f0"])
239 self
.mount_b
.run_shell(["touch", "d0/f1"])
240 self
.mount_b
.run_shell(["touch", "d1/f1"])
242 self
.assert_session_count(2, mds_id
=self
.fs
.get_rank(rank
=0, status
=status
)['name'])
243 self
.assert_session_count(2, mds_id
=self
.fs
.get_rank(rank
=1, status
=status
)['name'])
245 mount_a_client_id
= self
.mount_a
.get_global_id()
246 self
.fs
.mds_asok(['session', 'evict', "%s" % mount_a_client_id
],
247 mds_id
=self
.fs
.get_rank(rank
=0, status
=status
)['name'])
248 self
.wait_until_true(lambda: self
.mount_a
.is_blacklisted(), timeout
=30)
250 # 10 seconds should be enough for evicting client
252 self
.assert_session_count(1, mds_id
=self
.fs
.get_rank(rank
=0, status
=status
)['name'])
253 self
.assert_session_count(1, mds_id
=self
.fs
.get_rank(rank
=1, status
=status
)['name'])
255 self
.mount_a
.kill_cleanup()
257 self
.mount_a
.wait_until_mounted()