]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/test_dashboard.py
9f904a6e0c0e0d081ab55d1fbba596fd0a55ed65
[ceph.git] / ceph / qa / tasks / mgr / test_dashboard.py
1 import logging
2 import ssl
3
4 import requests
5 from requests.adapters import HTTPAdapter
6
7 from .mgr_test_case import MgrTestCase
8
9 log = logging.getLogger(__name__)
10
11
12 class TestDashboard(MgrTestCase):
13 MGRS_REQUIRED = 3
14
15 def setUp(self):
16 super(TestDashboard, self).setUp()
17
18 self._assign_ports("dashboard", "ssl_server_port")
19 self._load_module("dashboard")
20 self.mgr_cluster.mon_manager.raw_cluster_cmd("dashboard",
21 "create-self-signed-cert")
22
23 def tearDown(self):
24 self.mgr_cluster.mon_manager.raw_cluster_cmd("config", "set", "mgr",
25 "mgr/dashboard/standby_behaviour",
26 "redirect")
27 self.mgr_cluster.mon_manager.raw_cluster_cmd("config", "set", "mgr",
28 "mgr/dashboard/standby_error_status_code",
29 "500")
30
31 def test_standby(self):
32 original_active_id = self.mgr_cluster.get_active_id()
33 original_uri = self._get_uri("dashboard")
34 log.info("Originally running manager '{}' at {}".format(
35 original_active_id, original_uri))
36
37 # Force a failover and wait until the previously active manager
38 # is listed as standby.
39 self.mgr_cluster.mgr_fail(original_active_id)
40 self.wait_until_true(
41 lambda: original_active_id in self.mgr_cluster.get_standby_ids(),
42 timeout=30)
43
44 failed_active_id = self.mgr_cluster.get_active_id()
45 failed_over_uri = self._get_uri("dashboard")
46 log.info("After failover running manager '{}' at {}".format(
47 failed_active_id, failed_over_uri))
48
49 self.assertNotEqual(original_uri, failed_over_uri)
50
51 # The original active daemon should have come back up as a standby
52 # and be doing redirects to the new active daemon.
53 r = requests.get(original_uri, allow_redirects=False, verify=False)
54 self.assertEqual(r.status_code, 303)
55 self.assertEqual(r.headers['Location'], failed_over_uri)
56
57 # Ensure that every URL redirects to the active daemon.
58 r = requests.get("{}/runtime.js".format(original_uri),
59 allow_redirects=False,
60 verify=False)
61 self.assertEqual(r.status_code, 303)
62 self.assertEqual(r.headers['Location'], failed_over_uri)
63
64 def test_standby_disable_redirect(self):
65 self.mgr_cluster.mon_manager.raw_cluster_cmd("config", "set", "mgr",
66 "mgr/dashboard/standby_behaviour",
67 "error")
68
69 original_active_id = self.mgr_cluster.get_active_id()
70 original_uri = self._get_uri("dashboard")
71 log.info("Originally running manager '{}' at {}".format(
72 original_active_id, original_uri))
73
74 # Force a failover and wait until the previously active manager
75 # is listed as standby.
76 self.mgr_cluster.mgr_fail(original_active_id)
77 self.wait_until_true(
78 lambda: original_active_id in self.mgr_cluster.get_standby_ids(),
79 timeout=30)
80
81 failed_active_id = self.mgr_cluster.get_active_id()
82 failed_over_uri = self._get_uri("dashboard")
83 log.info("After failover running manager '{}' at {}".format(
84 failed_active_id, failed_over_uri))
85
86 self.assertNotEqual(original_uri, failed_over_uri)
87
88 # Redirection should be disabled now, instead a 500 must be returned.
89 r = requests.get(original_uri, allow_redirects=False, verify=False)
90 self.assertEqual(r.status_code, 500)
91
92 self.mgr_cluster.mon_manager.raw_cluster_cmd("config", "set", "mgr",
93 "mgr/dashboard/standby_error_status_code",
94 "503")
95
96 # The customized HTTP status code (503) must be returned.
97 r = requests.get(original_uri, allow_redirects=False, verify=False)
98 self.assertEqual(r.status_code, 503)
99
100 def test_urls(self):
101 base_uri = self._get_uri("dashboard")
102
103 # This is a very simple smoke test to check that the dashboard can
104 # give us a 200 response to requests. We're not testing that
105 # the content is correct or even renders!
106
107 urls = [
108 "/",
109 ]
110
111 failures = []
112
113 for url in urls:
114 r = requests.get(base_uri + url, allow_redirects=False,
115 verify=False)
116 if r.status_code >= 300 and r.status_code < 400:
117 log.error("Unexpected redirect to: {0} (from {1})".format(
118 r.headers['Location'], base_uri))
119 if r.status_code != 200:
120 failures.append(url)
121
122 log.info("{0}: {1} ({2} bytes)".format(
123 url, r.status_code, len(r.content)
124 ))
125
126 self.assertListEqual(failures, [])
127
128 def test_tls(self):
129 class CustomHTTPAdapter(HTTPAdapter):
130 def __init__(self, ssl_version):
131 self.ssl_version = ssl_version
132 super().__init__()
133
134 def init_poolmanager(self, *args, **kwargs):
135 kwargs['ssl_version'] = self.ssl_version
136 return super().init_poolmanager(*args, **kwargs)
137
138 uri = self._get_uri("dashboard")
139
140 # TLSv1
141 with self.assertRaises(requests.exceptions.SSLError):
142 session = requests.Session()
143 session.mount(uri, CustomHTTPAdapter(ssl.PROTOCOL_TLSv1))
144 session.get(uri, allow_redirects=False, verify=False)
145
146 # TLSv1.1
147 with self.assertRaises(requests.exceptions.SSLError):
148 session = requests.Session()
149 session.mount(uri, CustomHTTPAdapter(ssl.PROTOCOL_TLSv1_1))
150 session.get(uri, allow_redirects=False, verify=False)
151
152 session = requests.Session()
153 session.mount(uri, CustomHTTPAdapter(ssl.PROTOCOL_TLS))
154 r = session.get(uri, allow_redirects=False, verify=False)
155 self.assertEqual(r.status_code, 200)