]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_sessionmap.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / qa / tasks / cephfs / test_sessionmap.py
index 29dd0ff298b69704935429723d8b9590262b8dd7..8e310e25f27aca6dc70e1c103fe31640ef02354f 100644 (file)
@@ -1,4 +1,5 @@
 from StringIO import StringIO
+import time
 import json
 import logging
 from unittest import SkipTest
@@ -22,47 +23,35 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
+        status = self.fs.status()
+        self.fs.rank_tell(["session", "ls"], status=status)
 
-        ls_data = self.fs.mds_asok(['session', 'ls'])
+        ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
         self.assertEqual(len(ls_data), 0)
 
-    def _get_thread_count(self, mds_id):
-        remote = self.fs.mds_daemons[mds_id].remote
-
-        ps_txt = remote.run(
-            args=["ps", "-ww", "axo", "nlwp,cmd"],
-            stdout=StringIO()
-        ).stdout.getvalue().strip()
-        lines = ps_txt.split("\n")[1:]
-
-        for line in lines:
-            if "ceph-mds" in line and not "daemon-helper" in line:
-                if line.find("-i {0}".format(mds_id)) != -1:
-                    log.info("Found ps line for daemon: {0}".format(line))
-                    return int(line.split()[0])
-
-        raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
-            mds_id, ps_txt
-        ))
+    def _get_connection_count(self, status=None):
+        perf = self.fs.rank_asok(["perf", "dump"], status=status)
+        conn = 0
+        for module, dump in perf.iteritems():
+            if "AsyncMessenger::Worker" in module:
+                conn += dump['msgr_active_connections']
+        return conn
 
     def test_tell_conn_close(self):
         """
         That when a `tell` command is sent using the python CLI,
-        the thread count goes back to where it started (i.e. we aren't
+        the conn count goes back to where it started (i.e. we aren't
         leaving connections open)
         """
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
+        self.fs.rank_tell(["session", "ls"], status=status)
+        e = self._get_connection_count(status=status)
 
-        initial_thread_count = self._get_thread_count(mds_id)
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
-        final_thread_count = self._get_thread_count(mds_id)
-
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_mount_conn_close(self):
         """
@@ -72,16 +61,15 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-
-        initial_thread_count = self._get_thread_count(mds_id)
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
         self.mount_a.mount()
         self.mount_a.wait_until_mounted()
-        self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
+        self.assertGreater(self._get_connection_count(status=status), s)
         self.mount_a.umount_wait()
-        final_thread_count = self._get_thread_count(mds_id)
+        e = self._get_connection_count(status=status)
 
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_version_splitting(self):
         """
@@ -102,11 +90,7 @@ class TestSessionMap(CephFSTestCase):
         self.fs.set_max_mds(2)
         self.fs.wait_for_daemons()
 
-        active_mds_names = self.fs.get_active_names()
-        rank_0_id = active_mds_names[0]
-        rank_1_id = active_mds_names[1]
-        log.info("Ranks 0 and 1 are {0} and {1}".format(
-            rank_0_id, rank_1_id))
+        status = self.fs.status()
 
         # Bring the clients back
         self.mount_a.mount()
@@ -115,10 +99,10 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.create_files()
 
         # See that they've got sessions
-        self.assert_session_count(2, mds_id=rank_0_id)
+        self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
 
         # See that we persist their sessions
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)
@@ -130,14 +114,14 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.run_shell(["ls", "-l", "bravo/file"])
 
         def get_omap_wrs():
-            return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
+            return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
 
         # Flush so that there are no dirty sessions on rank 1
-        self.fs.mds_asok(["flush", "journal"], rank_1_id)
+        self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
 
         # Export so that we get a force_open to rank 1 for the two sessions from rank 0
         initial_omap_wrs = get_omap_wrs()
-        self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
+        self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
 
         # This is the critical (if rather subtle) check: that in the process of doing an export dir,
         # we hit force_open_sessions, and as a result we end up writing out the sessionmap.  There
@@ -156,10 +140,10 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.umount_wait()
 
         # In-memory sessionmap check
-        self.assert_session_count(0, mds_id=rank_0_id)
+        self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
 
         # On-disk sessionmap check
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)
@@ -233,3 +217,41 @@ class TestSessionMap(CephFSTestCase):
         with self.assert_cluster_log("client session with non-allowable root '/baz' denied"):
             with self.assertRaises(CommandFailedError):
                 self.mount_b.mount(mount_path="/foo/bar")
+
+    def test_session_evict_blacklisted(self):
+        """
+        Check that mds evicts blacklisted client
+        """
+        if not isinstance(self.mount_a, FuseMount):
+            self.skipTest("Requires FUSE client to use is_blacklisted()")
+
+        self.fs.set_max_mds(2)
+        self.fs.wait_for_daemons()
+        status = self.fs.status()
+
+        self.mount_a.run_shell(["mkdir", "d0", "d1"])
+        self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
+        self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
+        self._wait_subtrees(status, 0, [('/d0', 0), ('/d1', 1)])
+
+        self.mount_a.run_shell(["touch", "d0/f0"])
+        self.mount_a.run_shell(["touch", "d1/f0"])
+        self.mount_b.run_shell(["touch", "d0/f1"])
+        self.mount_b.run_shell(["touch", "d1/f1"])
+
+        self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+        mount_a_client_id = self.mount_a.get_global_id()
+        self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id],
+                         mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.wait_until_true(lambda: self.mount_a.is_blacklisted(), timeout=30)
+
+        # 10 seconds should be enough for evicting client
+        time.sleep(10)
+        self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+        self.mount_a.kill_cleanup()
+        self.mount_a.mount()
+        self.mount_a.wait_until_mounted()