]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mgr/test_dashboard.py
9f904a6e0c0e0d081ab55d1fbba596fd0a55ed65
5 from requests
.adapters
import HTTPAdapter
7 from .mgr_test_case
import MgrTestCase
9 log
= logging
.getLogger(__name__
)
12 class TestDashboard(MgrTestCase
):
16 super(TestDashboard
, self
).setUp()
18 self
._assign
_ports
("dashboard", "ssl_server_port")
19 self
._load
_module
("dashboard")
20 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("dashboard",
21 "create-self-signed-cert")
24 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("config", "set", "mgr",
25 "mgr/dashboard/standby_behaviour",
27 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("config", "set", "mgr",
28 "mgr/dashboard/standby_error_status_code",
31 def test_standby(self
):
32 original_active_id
= self
.mgr_cluster
.get_active_id()
33 original_uri
= self
._get
_uri
("dashboard")
34 log
.info("Originally running manager '{}' at {}".format(
35 original_active_id
, original_uri
))
37 # Force a failover and wait until the previously active manager
38 # is listed as standby.
39 self
.mgr_cluster
.mgr_fail(original_active_id
)
41 lambda: original_active_id
in self
.mgr_cluster
.get_standby_ids(),
44 failed_active_id
= self
.mgr_cluster
.get_active_id()
45 failed_over_uri
= self
._get
_uri
("dashboard")
46 log
.info("After failover running manager '{}' at {}".format(
47 failed_active_id
, failed_over_uri
))
49 self
.assertNotEqual(original_uri
, failed_over_uri
)
51 # The original active daemon should have come back up as a standby
52 # and be doing redirects to the new active daemon.
53 r
= requests
.get(original_uri
, allow_redirects
=False, verify
=False)
54 self
.assertEqual(r
.status_code
, 303)
55 self
.assertEqual(r
.headers
['Location'], failed_over_uri
)
57 # Ensure that every URL redirects to the active daemon.
58 r
= requests
.get("{}/runtime.js".format(original_uri
),
59 allow_redirects
=False,
61 self
.assertEqual(r
.status_code
, 303)
62 self
.assertEqual(r
.headers
['Location'], failed_over_uri
)
64 def test_standby_disable_redirect(self
):
65 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("config", "set", "mgr",
66 "mgr/dashboard/standby_behaviour",
69 original_active_id
= self
.mgr_cluster
.get_active_id()
70 original_uri
= self
._get
_uri
("dashboard")
71 log
.info("Originally running manager '{}' at {}".format(
72 original_active_id
, original_uri
))
74 # Force a failover and wait until the previously active manager
75 # is listed as standby.
76 self
.mgr_cluster
.mgr_fail(original_active_id
)
78 lambda: original_active_id
in self
.mgr_cluster
.get_standby_ids(),
81 failed_active_id
= self
.mgr_cluster
.get_active_id()
82 failed_over_uri
= self
._get
_uri
("dashboard")
83 log
.info("After failover running manager '{}' at {}".format(
84 failed_active_id
, failed_over_uri
))
86 self
.assertNotEqual(original_uri
, failed_over_uri
)
88 # Redirection should be disabled now, instead a 500 must be returned.
89 r
= requests
.get(original_uri
, allow_redirects
=False, verify
=False)
90 self
.assertEqual(r
.status_code
, 500)
92 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("config", "set", "mgr",
93 "mgr/dashboard/standby_error_status_code",
96 # The customized HTTP status code (503) must be returned.
97 r
= requests
.get(original_uri
, allow_redirects
=False, verify
=False)
98 self
.assertEqual(r
.status_code
, 503)
101 base_uri
= self
._get
_uri
("dashboard")
103 # This is a very simple smoke test to check that the dashboard can
104 # give us a 200 response to requests. We're not testing that
105 # the content is correct or even renders!
114 r
= requests
.get(base_uri
+ url
, allow_redirects
=False,
116 if r
.status_code
>= 300 and r
.status_code
< 400:
117 log
.error("Unexpected redirect to: {0} (from {1})".format(
118 r
.headers
['Location'], base_uri
))
119 if r
.status_code
!= 200:
122 log
.info("{0}: {1} ({2} bytes)".format(
123 url
, r
.status_code
, len(r
.content
)
126 self
.assertListEqual(failures
, [])
129 class CustomHTTPAdapter(HTTPAdapter
):
130 def __init__(self
, ssl_version
):
131 self
.ssl_version
= ssl_version
134 def init_poolmanager(self
, *args
, **kwargs
):
135 kwargs
['ssl_version'] = self
.ssl_version
136 return super().init_poolmanager(*args
, **kwargs
)
138 uri
= self
._get
_uri
("dashboard")
141 with self
.assertRaises(requests
.exceptions
.SSLError
):
142 session
= requests
.Session()
143 session
.mount(uri
, CustomHTTPAdapter(ssl
.PROTOCOL_TLSv1
))
144 session
.get(uri
, allow_redirects
=False, verify
=False)
147 with self
.assertRaises(requests
.exceptions
.SSLError
):
148 session
= requests
.Session()
149 session
.mount(uri
, CustomHTTPAdapter(ssl
.PROTOCOL_TLSv1_1
))
150 session
.get(uri
, allow_redirects
=False, verify
=False)
152 session
= requests
.Session()
153 session
.mount(uri
, CustomHTTPAdapter(ssl
.PROTOCOL_TLS
))
154 r
= session
.get(uri
, allow_redirects
=False, verify
=False)
155 self
.assertEqual(r
.status_code
, 200)