]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_sessionmap.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_sessionmap.py
index e9b4b646d8f184ac3638c15d64044303c6e39af2..bdcde71d095eb571b8760abce07d75aa80378fb6 100644 (file)
@@ -1,11 +1,11 @@
-from StringIO import StringIO
+import time
 import json
 import logging
-from unittest import SkipTest
 
 from tasks.cephfs.fuse_mount import FuseMount
 from teuthology.exceptions import CommandFailedError
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
+from teuthology.misc import sudo_write_file
 
 log = logging.getLogger(__name__)
 
@@ -22,47 +22,35 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
+        status = self.fs.status()
+        self.fs.rank_tell(["session", "ls"], status=status)
 
-        ls_data = self.fs.mds_asok(['session', 'ls'])
+        ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
         self.assertEqual(len(ls_data), 0)
 
-    def _get_thread_count(self, mds_id):
-        remote = self.fs.mds_daemons[mds_id].remote
-
-        ps_txt = remote.run(
-            args=["ps", "-ww", "axo", "nlwp,cmd"],
-            stdout=StringIO()
-        ).stdout.getvalue().strip()
-        lines = ps_txt.split("\n")[1:]
-
-        for line in lines:
-            if "ceph-mds" in line and not "daemon-helper" in line:
-                if line.find("-i {0}".format(mds_id)) != -1:
-                    log.info("Found ps line for daemon: {0}".format(line))
-                    return int(line.split()[0])
-
-        raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
-            mds_id, ps_txt
-        ))
+    def _get_connection_count(self, status=None):
+        perf = self.fs.rank_asok(["perf", "dump"], status=status)
+        conn = 0
+        for module, dump in perf.items():
+            if "AsyncMessenger::Worker" in module:
+                conn += dump['msgr_active_connections']
+        return conn
 
     def test_tell_conn_close(self):
         """
         That when a `tell` command is sent using the python CLI,
-        the thread count goes back to where it started (i.e. we aren't
+        the conn count goes back to where it started (i.e. we aren't
         leaving connections open)
         """
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-
-        initial_thread_count = self._get_thread_count(mds_id)
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
-        final_thread_count = self._get_thread_count(mds_id)
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
+        self.fs.rank_tell(["session", "ls"], status=status)
+        e = self._get_connection_count(status=status)
 
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_mount_conn_close(self):
         """
@@ -72,16 +60,14 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-
-        initial_thread_count = self._get_thread_count(mds_id)
-        self.mount_a.mount()
-        self.mount_a.wait_until_mounted()
-        self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
+        self.mount_a.mount_wait()
+        self.assertGreater(self._get_connection_count(status=status), s)
         self.mount_a.umount_wait()
-        final_thread_count = self._get_thread_count(mds_id)
+        e = self._get_connection_count(status=status)
 
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_version_splitting(self):
         """
@@ -89,56 +75,45 @@ class TestSessionMap(CephFSTestCase):
         split into multiple versions to obey mds_sessionmap_keys_per_op
         """
 
-        # Start umounted
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
         # Configure MDS to write one OMAP key at once
         self.set_conf('mds', 'mds_sessionmap_keys_per_op', 1)
         self.fs.mds_fail_restart()
-        self.fs.wait_for_daemons()
-
-        # I would like two MDSs, so that I can do an export dir later
-        self.fs.set_allow_multimds(True)
-        self.fs.set_max_mds(2)
-        self.fs.wait_for_daemons()
-
-        active_mds_names = self.fs.get_active_names()
-        rank_0_id = active_mds_names[0]
-        rank_1_id = active_mds_names[1]
-        log.info("Ranks 0 and 1 are {0} and {1}".format(
-            rank_0_id, rank_1_id))
+        status = self.fs.wait_for_daemons()
 
         # Bring the clients back
-        self.mount_a.mount()
-        self.mount_b.mount()
-        self.mount_a.create_files()  # Kick the client into opening sessions
-        self.mount_b.create_files()
+        self.mount_a.mount_wait()
+        self.mount_b.mount_wait()
 
         # See that they've got sessions
-        self.assert_session_count(2, mds_id=rank_0_id)
+        self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
 
         # See that we persist their sessions
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)
-        self.assertEqual(len(table_json['0']['data']['Sessions']), 2)
+        self.assertEqual(len(table_json['0']['data']['sessions']), 2)
 
         # Now, induce a "force_open_sessions" event by exporting a dir
         self.mount_a.run_shell(["mkdir", "bravo"])
-        self.mount_a.run_shell(["touch", "bravo/file"])
-        self.mount_b.run_shell(["ls", "-l", "bravo/file"])
+        self.mount_a.run_shell(["touch", "bravo/file_a"])
+        self.mount_b.run_shell(["touch", "bravo/file_b"])
+
+        self.fs.set_max_mds(2)
+        status = self.fs.wait_for_daemons()
 
         def get_omap_wrs():
-            return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
+            return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
 
         # Flush so that there are no dirty sessions on rank 1
-        self.fs.mds_asok(["flush", "journal"], rank_1_id)
+        self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
 
         # Export so that we get a force_open to rank 1 for the two sessions from rank 0
         initial_omap_wrs = get_omap_wrs()
-        self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
+        self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
 
         # This is the critical (if rather subtle) check: that in the process of doing an export dir,
         # we hit force_open_sessions, and as a result we end up writing out the sessionmap.  There
@@ -146,10 +121,10 @@ class TestSessionMap(CephFSTestCase):
         # a single session get written out (the first of the two, triggered by the second getting marked
         # dirty)
         # The number of writes is two per session, because the header (sessionmap version) update and
-        # KV write both count.
+        # KV write both count. Also, multiply by 2 for each openfile table update.
         self.wait_until_true(
-            lambda: get_omap_wrs() - initial_omap_wrs == 2,
-            timeout=10  # Long enough for an export to get acked
+            lambda: get_omap_wrs() - initial_omap_wrs == 2*2,
+            timeout=30  # Long enough for an export to get acked
         )
 
         # Now end our sessions and check the backing sessionmap is updated correctly
@@ -157,35 +132,14 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.umount_wait()
 
         # In-memory sessionmap check
-        self.assert_session_count(0, mds_id=rank_0_id)
+        self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
 
         # On-disk sessionmap check
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)
-        self.assertEqual(len(table_json['0']['data']['Sessions']), 0)
-
-    def _sudo_write_file(self, remote, path, data):
-        """
-        Write data to a remote file as super user
-
-        :param remote: Remote site.
-        :param path: Path on the remote being written to.
-        :param data: Data to be written.
-
-        Both perms and owner are passed directly to chmod.
-        """
-        remote.run(
-            args=[
-                'sudo',
-                'python',
-                '-c',
-                'import shutil, sys; shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))',
-                path,
-            ],
-            stdin=data,
-        )
+        self.assertEqual(len(table_json['0']['data']['sessions']), 0)
 
     def _configure_auth(self, mount, id_name, mds_caps, osd_caps=None, mon_caps=None):
         """
@@ -206,12 +160,12 @@ class TestSessionMap(CephFSTestCase):
             "mon", mon_caps
         )
         mount.client_id = id_name
-        self._sudo_write_file(mount.client_remote, mount.get_keyring_path(), out)
+        sudo_write_file(mount.client_remote, mount.get_keyring_path(), out)
         self.set_conf("client.{name}".format(name=id_name), "keyring", mount.get_keyring_path())
 
     def test_session_reject(self):
         if not isinstance(self.mount_a, FuseMount):
-            raise SkipTest("Requires FUSE client to inject client metadata")
+            self.skipTest("Requires FUSE client to inject client metadata")
 
         self.mount_a.run_shell(["mkdir", "foo"])
         self.mount_a.run_shell(["mkdir", "foo/bar"])
@@ -223,14 +177,50 @@ class TestSessionMap(CephFSTestCase):
         # Configure a client that is limited to /foo/bar
         self._configure_auth(self.mount_b, "badguy", "allow rw path=/foo/bar")
         # Check he can mount that dir and do IO
-        self.mount_b.mount(mount_path="/foo/bar")
-        self.mount_b.wait_until_mounted()
+        self.mount_b.mount_wait(mount_path="/foo/bar")
         self.mount_b.create_destroy()
         self.mount_b.umount_wait()
 
         # Configure the client to claim that its mount point metadata is /baz
         self.set_conf("client.badguy", "client_metadata", "root=/baz")
         # Try to mount the client, see that it fails
-        with self.assert_cluster_log("client session with invalid root '/baz' denied"):
+        with self.assert_cluster_log("client session with non-allowable root '/baz' denied"):
             with self.assertRaises(CommandFailedError):
                 self.mount_b.mount(mount_path="/foo/bar")
+
+    def test_session_evict_blacklisted(self):
+        """
+        Check that mds evicts blacklisted client
+        """
+        if not isinstance(self.mount_a, FuseMount):
+            self.skipTest("Requires FUSE client to use is_blacklisted()")
+
+        self.fs.set_max_mds(2)
+        self.fs.wait_for_daemons()
+        status = self.fs.status()
+
+        self.mount_a.run_shell(["mkdir", "d0", "d1"])
+        self.mount_a.setfattr("d0", "ceph.dir.pin", "0")
+        self.mount_a.setfattr("d1", "ceph.dir.pin", "1")
+        self._wait_subtrees(status, 0, [('/d0', 0), ('/d1', 1)])
+
+        self.mount_a.run_shell(["touch", "d0/f0"])
+        self.mount_a.run_shell(["touch", "d1/f0"])
+        self.mount_b.run_shell(["touch", "d0/f1"])
+        self.mount_b.run_shell(["touch", "d1/f1"])
+
+        self.assert_session_count(2, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.assert_session_count(2, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+        mount_a_client_id = self.mount_a.get_global_id()
+        self.fs.mds_asok(['session', 'evict', "%s" % mount_a_client_id],
+                         mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.wait_until_true(lambda: self.mount_a.is_blacklisted(), timeout=30)
+
+        # 10 seconds should be enough for evicting client
+        time.sleep(10)
+        self.assert_session_count(1, mds_id=self.fs.get_rank(rank=0, status=status)['name'])
+        self.assert_session_count(1, mds_id=self.fs.get_rank(rank=1, status=status)['name'])
+
+        self.mount_a.kill_cleanup()
+        self.mount_a.mount_wait()