from StringIO import StringIO
+import time
import json
import logging
from unittest import SkipTest
self.mount_a.umount_wait()
self.mount_b.umount_wait()
- mds_id = self.fs.get_lone_mds_id()
- self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
+ status = self.fs.status()
+ self.fs.rank_tell(["session", "ls"], status=status)
- ls_data = self.fs.mds_asok(['session', 'ls'])
+ ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
self.assertEqual(len(ls_data), 0)
- def _get_thread_count(self, mds_id):
- remote = self.fs.mds_daemons[mds_id].remote
-
- ps_txt = remote.run(
- args=["ps", "-ww", "axo", "nlwp,cmd"],
- stdout=StringIO()
- ).stdout.getvalue().strip()
- lines = ps_txt.split("\n")[1:]
-
- for line in lines:
- if "ceph-mds" in line and not "daemon-helper" in line:
- if line.find("-i {0}".format(mds_id)) != -1:
- log.info("Found ps line for daemon: {0}".format(line))
- return int(line.split()[0])
-
- raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
- mds_id, ps_txt
- ))
+ def _get_connection_count(self, status=None):
+ perf = self.fs.rank_asok(["perf", "dump"], status=status)
+ conn = 0
+ for module, dump in perf.iteritems():
+ if "AsyncMessenger::Worker" in module:
+ conn += dump['msgr_active_connections']
+ return conn
def test_tell_conn_close(self):
"""
That when a `tell` command is sent using the python CLI,
- the thread count goes back to where it started (i.e. we aren't
+ the conn count goes back to where it started (i.e. we aren't
leaving connections open)
"""
self.mount_a.umount_wait()
self.mount_b.umount_wait()
- mds_id = self.fs.get_lone_mds_id()
+ status = self.fs.status()
+ s = self._get_connection_count(status=status)
+ self.fs.rank_tell(["session", "ls"], status=status)
+ e = self._get_connection_count(status=status)
- initial_thread_count = self._get_thread_count(mds_id)
- self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
- final_thread_count = self._get_thread_count(mds_id)
-
- self.assertEqual(initial_thread_count, final_thread_count)
+ self.assertEqual(s, e)
def test_mount_conn_close(self):
"""
self.mount_a.umount_wait()
self.mount_b.umount_wait()
- mds_id = self.fs.get_lone_mds_id()
-
- initial_thread_count = self._get_thread_count(mds_id)
+ status = self.fs.status()
+ s = self._get_connection_count(status=status)
self.mount_a.mount()
self.mount_a.wait_until_mounted()
- self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
+ self.assertGreater(self._get_connection_count(status=status), s)
self.mount_a.umount_wait()
- final_thread_count = self._get_thread_count(mds_id)
+ e = self._get_connection_count(status=status)
- self.assertEqual(initial_thread_count, final_thread_count)
+ self.assertEqual(s, e)
def test_version_splitting(self):
"""
self.fs.set_max_mds(2)
self.fs.wait_for_daemons()
- active_mds_names = self.fs.get_active_names()
- rank_0_id = active_mds_names[0]
- rank_1_id = active_mds_names[1]
- log.info("Ranks 0 and 1 are {0} and {1}".format(
- rank_0_id, rank_1_id))
+ status = self.fs.status()
# Bring the clients back
self.mount_a.mount()
self.mount_b.create_files()
# See that they've got sessions
- self.assert_session_count(2, mds_id=rank_0_id)
+ self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
# See that we persist their sessions
- self.fs.mds_asok(["flush", "journal"], rank_0_id)
+ self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)
self.mount_b.run_shell(["ls", "-l", "bravo/file"])
def get_omap_wrs():
- return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
+ return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
# Flush so that there are no dirty sessions on rank 1
- self.fs.mds_asok(["flush", "journal"], rank_1_id)
+ self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
# Export so that we get a force_open to rank 1 for the two sessions from rank 0
initial_omap_wrs = get_omap_wrs()
- self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
+ self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
# This is the critical (if rather subtle) check: that in the process of doing an export dir,
# we hit force_open_sessions, and as a result we end up writing out the sessionmap. There
self.mount_b.umount_wait()
# In-memory sessionmap check
- self.assert_session_count(0, mds_id=rank_0_id)
+ self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
# On-disk sessionmap check
- self.fs.mds_asok(["flush", "journal"], rank_0_id)
+ self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
self.assertEqual(table_json['0']['result'], 0)
with self.assert_cluster_log("client session with non-allowable root '/baz' denied"):
with self.assertRaises(CommandFailedError):
self.mount_b.mount(mount_path="/foo/bar")
+
+ def test_session_evict_blacklisted(self):
+ """
+ Check that mds evicts blacklisted client
+ """
+ if not isinstance(self.mount_a, FuseMount):
+ self.skipTest("Requires FUSE client to use is_blacklisted()")
+
+ self.fs.set_max_mds(2)
+ self.fs.wait_for_daemons()
+ status = self.fs.status()
+
+ self.mount_a.run_shell(["mkdir", "d0", "d1"])
+ self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
+ self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
+ self._wait_subtrees(status, 0, [('/d0', 0), ('/d1', 1)])
+
+ self.mount_a.run_shell(["touch", "d0/f0"])
+ self.mount_a.run_shell(["touch", "d1/f0"])
+ self.mount_b.run_shell(["touch", "d0/f1"])
+ self.mount_b.run_shell(["touch", "d1/f1"])
+
+ self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+ self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+ mount_a_client_id = self.mount_a.get_global_id()
+ self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id],
+ mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+ self.wait_until_true(lambda: self.mount_a.is_blacklisted(), timeout=30)
+
+ # 10 seconds should be enough for evicting client
+ time.sleep(10)
+ self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+ self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+ self.mount_a.kill_cleanup()
+ self.mount_a.mount()
+ self.mount_a.wait_until_mounted()