]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_prometheus.py
3385d66a974e4f559d27baa82bfc16846c5451f9
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_prometheus.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=protected-access
3 try:
4 from mock import patch
5 except ImportError:
6 from unittest.mock import patch
7
8 from . import ControllerTestCase
9 from .. import mgr
10 from ..controllers.prometheus import Prometheus, PrometheusReceiver, PrometheusNotifications
11
12
13 class PrometheusControllerTest(ControllerTestCase):
14 alert_host = 'http://alertmanager:9093/mock'
15 alert_host_api = alert_host + '/api/v1'
16
17 prometheus_host = 'http://prometheus:9090/mock'
18 prometheus_host_api = prometheus_host + '/api/v1'
19
20 @classmethod
21 def setup_server(cls):
22 settings = {
23 'ALERTMANAGER_API_HOST': cls.alert_host,
24 'PROMETHEUS_API_HOST': cls.prometheus_host
25 }
26 mgr.get_module_option.side_effect = settings.get
27 Prometheus._cp_config['tools.authenticate.on'] = False
28 PrometheusNotifications._cp_config['tools.authenticate.on'] = False
29 cls.setup_controllers([Prometheus, PrometheusNotifications, PrometheusReceiver])
30
31 def test_rules(self):
32 with patch('requests.request') as mock_request:
33 self._get('/api/prometheus/rules')
34 mock_request.assert_called_with('GET', self.prometheus_host_api + '/rules',
35 json=None, params={})
36
37 def test_list(self):
38 with patch('requests.request') as mock_request:
39 self._get('/api/prometheus')
40 mock_request.assert_called_with('GET', self.alert_host_api + '/alerts',
41 json=None, params={})
42
43 def test_get_silences(self):
44 with patch('requests.request') as mock_request:
45 self._get('/api/prometheus/silences')
46 mock_request.assert_called_with('GET', self.alert_host_api + '/silences',
47 json=None, params={})
48
49 def test_add_silence(self):
50 with patch('requests.request') as mock_request:
51 self._post('/api/prometheus/silence', {'id': 'new-silence'})
52 mock_request.assert_called_with('POST', self.alert_host_api + '/silences',
53 params=None, json={'id': 'new-silence'})
54
55 def test_update_silence(self):
56 with patch('requests.request') as mock_request:
57 self._post('/api/prometheus/silence', {'id': 'update-silence'})
58 mock_request.assert_called_with('POST', self.alert_host_api + '/silences',
59 params=None, json={'id': 'update-silence'})
60
61 def test_expire_silence(self):
62 with patch('requests.request') as mock_request:
63 self._delete('/api/prometheus/silence/0')
64 mock_request.assert_called_with('DELETE', self.alert_host_api + '/silence/0',
65 json=None, params=None)
66
67 def test_silences_empty_delete(self):
68 with patch('requests.request') as mock_request:
69 self._delete('/api/prometheus/silence')
70 mock_request.assert_not_called()
71
72 def test_post_on_receiver(self):
73 PrometheusReceiver.notifications = []
74 self._post('/api/prometheus_receiver', {'name': 'foo'})
75 self.assertEqual(len(PrometheusReceiver.notifications), 1)
76 notification = PrometheusReceiver.notifications[0]
77 self.assertEqual(notification['name'], 'foo')
78 self.assertTrue(len(notification['notified']) > 20)
79
80 def test_get_empty_list_with_no_notifications(self):
81 PrometheusReceiver.notifications = []
82 self._get('/api/prometheus/notifications')
83 self.assertStatus(200)
84 self.assertJsonBody([])
85 self._get('/api/prometheus/notifications?from=last')
86 self.assertStatus(200)
87 self.assertJsonBody([])
88
89 def test_get_all_notification(self):
90 PrometheusReceiver.notifications = []
91 self._post('/api/prometheus_receiver', {'name': 'foo'})
92 self._post('/api/prometheus_receiver', {'name': 'bar'})
93 self._get('/api/prometheus/notifications')
94 self.assertStatus(200)
95 self.assertJsonBody(PrometheusReceiver.notifications)
96
97 def test_get_last_notification_with_use_of_last_keyword(self):
98 PrometheusReceiver.notifications = []
99 self._post('/api/prometheus_receiver', {'name': 'foo'})
100 self._post('/api/prometheus_receiver', {'name': 'bar'})
101 self._get('/api/prometheus/notifications?from=last')
102 self.assertStatus(200)
103 last = PrometheusReceiver.notifications[1]
104 self.assertJsonBody([last])
105
106 def test_get_no_notification_with_unknown_id(self):
107 PrometheusReceiver.notifications = []
108 self._post('/api/prometheus_receiver', {'name': 'foo'})
109 self._post('/api/prometheus_receiver', {'name': 'bar'})
110 self._get('/api/prometheus/notifications?from=42')
111 self.assertStatus(200)
112 self.assertJsonBody([])
113
114 def test_get_no_notification_since_with_last_notification(self):
115 PrometheusReceiver.notifications = []
116 self._post('/api/prometheus_receiver', {'name': 'foo'})
117 notification = PrometheusReceiver.notifications[0]
118 self._get('/api/prometheus/notifications?from=' + notification['id'])
119 self.assertStatus(200)
120 self.assertJsonBody([])
121
122 def test_get_notifications_since_last_notification(self):
123 PrometheusReceiver.notifications = []
124 self._post('/api/prometheus_receiver', {'name': 'foobar'})
125 next_to_last = PrometheusReceiver.notifications[0]
126 self._post('/api/prometheus_receiver', {'name': 'foo'})
127 self._post('/api/prometheus_receiver', {'name': 'bar'})
128 self._get('/api/prometheus/notifications?from=' + next_to_last['id'])
129 forelast = PrometheusReceiver.notifications[1]
130 last = PrometheusReceiver.notifications[2]
131 self.assertEqual(self.json_body(), [forelast, last])