]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_prometheus.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_prometheus.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=protected-access
3 try:
4 from mock import patch
5 except ImportError:
6 from unittest.mock import patch
7
8 from .. import mgr
9 from ..controllers.prometheus import Prometheus, PrometheusNotifications, PrometheusReceiver
10 from ..tests import ControllerTestCase
11
12
13 class PrometheusControllerTest(ControllerTestCase):
14 alert_host = 'http://alertmanager:9093/mock'
15 alert_host_api = alert_host + '/api/v1'
16
17 prometheus_host = 'http://prometheus:9090/mock'
18 prometheus_host_api = prometheus_host + '/api/v1'
19
20 @classmethod
21 def setup_server(cls):
22 settings = {
23 'ALERTMANAGER_API_HOST': cls.alert_host,
24 'PROMETHEUS_API_HOST': cls.prometheus_host
25 }
26 mgr.get_module_option.side_effect = settings.get
27 cls.setup_controllers([Prometheus, PrometheusNotifications, PrometheusReceiver])
28
29 def test_rules(self):
30 with patch('requests.request') as mock_request:
31 self._get('/api/prometheus/rules')
32 mock_request.assert_called_with('GET', self.prometheus_host_api + '/rules',
33 json=None, params={}, verify=True)
34
35 def test_list(self):
36 with patch('requests.request') as mock_request:
37 self._get('/api/prometheus')
38 mock_request.assert_called_with('GET', self.alert_host_api + '/alerts',
39 json=None, params={}, verify=True)
40
41 def test_get_silences(self):
42 with patch('requests.request') as mock_request:
43 self._get('/api/prometheus/silences')
44 mock_request.assert_called_with('GET', self.alert_host_api + '/silences',
45 json=None, params={}, verify=True)
46
47 def test_add_silence(self):
48 with patch('requests.request') as mock_request:
49 self._post('/api/prometheus/silence', {'id': 'new-silence'})
50 mock_request.assert_called_with('POST', self.alert_host_api + '/silences',
51 params=None, json={'id': 'new-silence'},
52 verify=True)
53
54 def test_update_silence(self):
55 with patch('requests.request') as mock_request:
56 self._post('/api/prometheus/silence', {'id': 'update-silence'})
57 mock_request.assert_called_with('POST', self.alert_host_api + '/silences',
58 params=None, json={'id': 'update-silence'},
59 verify=True)
60
61 def test_expire_silence(self):
62 with patch('requests.request') as mock_request:
63 self._delete('/api/prometheus/silence/0')
64 mock_request.assert_called_with('DELETE', self.alert_host_api + '/silence/0',
65 json=None, params=None, verify=True)
66
67 def test_silences_empty_delete(self):
68 with patch('requests.request') as mock_request:
69 self._delete('/api/prometheus/silence')
70 mock_request.assert_not_called()
71
72 def test_post_on_receiver(self):
73 PrometheusReceiver.notifications = []
74 self._post('/api/prometheus_receiver', {'name': 'foo'})
75 self.assertEqual(len(PrometheusReceiver.notifications), 1)
76 notification = PrometheusReceiver.notifications[0]
77 self.assertEqual(notification['name'], 'foo')
78 self.assertTrue(len(notification['notified']) > 20)
79
80 def test_get_empty_list_with_no_notifications(self):
81 PrometheusReceiver.notifications = []
82 self._get('/api/prometheus/notifications')
83 self.assertStatus(200)
84 self.assertJsonBody([])
85 self._get('/api/prometheus/notifications?from=last')
86 self.assertStatus(200)
87 self.assertJsonBody([])
88
89 def test_get_all_notification(self):
90 PrometheusReceiver.notifications = []
91 self._post('/api/prometheus_receiver', {'name': 'foo'})
92 self._post('/api/prometheus_receiver', {'name': 'bar'})
93 self._get('/api/prometheus/notifications')
94 self.assertStatus(200)
95 self.assertJsonBody(PrometheusReceiver.notifications)
96
97 def test_get_last_notification_with_use_of_last_keyword(self):
98 PrometheusReceiver.notifications = []
99 self._post('/api/prometheus_receiver', {'name': 'foo'})
100 self._post('/api/prometheus_receiver', {'name': 'bar'})
101 self._get('/api/prometheus/notifications?from=last')
102 self.assertStatus(200)
103 last = PrometheusReceiver.notifications[1]
104 self.assertJsonBody([last])
105
106 def test_get_no_notification_with_unknown_id(self):
107 PrometheusReceiver.notifications = []
108 self._post('/api/prometheus_receiver', {'name': 'foo'})
109 self._post('/api/prometheus_receiver', {'name': 'bar'})
110 self._get('/api/prometheus/notifications?from=42')
111 self.assertStatus(200)
112 self.assertJsonBody([])
113
114 def test_get_no_notification_since_with_last_notification(self):
115 PrometheusReceiver.notifications = []
116 self._post('/api/prometheus_receiver', {'name': 'foo'})
117 notification = PrometheusReceiver.notifications[0]
118 self._get('/api/prometheus/notifications?from=' + notification['id'])
119 self.assertStatus(200)
120 self.assertJsonBody([])
121
122 def test_get_notifications_since_last_notification(self):
123 PrometheusReceiver.notifications = []
124 self._post('/api/prometheus_receiver', {'name': 'foobar'})
125 next_to_last = PrometheusReceiver.notifications[0]
126 self._post('/api/prometheus_receiver', {'name': 'foo'})
127 self._post('/api/prometheus_receiver', {'name': 'bar'})
128 self._get('/api/prometheus/notifications?from=' + next_to_last['id'])
129 forelast = PrometheusReceiver.notifications[1]
130 last = PrometheusReceiver.notifications[2]
131 self.assertEqual(self.json_body(), [forelast, last])