]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_prometheus.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_prometheus.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=protected-access
3 from . import ControllerTestCase
4 from .. import mgr
5 from ..controllers import BaseController, Controller
6 from ..controllers.prometheus import Prometheus, PrometheusReceiver, PrometheusNotifications
7
8
9 @Controller('alertmanager/mocked/api/v1/alerts', secure=False)
10 class AlertManagerMockInstance(BaseController):
11 def __call__(self, path, **params):
12 return 'Some Api {}'.format(path)
13
14
15 class PrometheusControllerTest(ControllerTestCase):
16 @classmethod
17 def setup_server(cls):
18 settings = {
19 'ALERTMANAGER_API_HOST': 'http://localhost:{}/alertmanager/mocked/'.format(54583)
20 }
21 mgr.get_module_option.side_effect = settings.get
22 Prometheus._cp_config['tools.authenticate.on'] = False
23 PrometheusNotifications._cp_config['tools.authenticate.on'] = False
24 cls.setup_controllers([AlertManagerMockInstance, Prometheus,
25 PrometheusNotifications, PrometheusReceiver])
26
27 def test_list(self):
28 self._get('/api/prometheus')
29 self.assertStatus(200)
30
31 def test_post_on_receiver(self):
32 PrometheusReceiver.notifications = []
33 self._post('/api/prometheus_receiver', {'name': 'foo'})
34 self.assertEqual(len(PrometheusReceiver.notifications), 1)
35 notification = PrometheusReceiver.notifications[0]
36 self.assertEqual(notification['name'], 'foo')
37 self.assertTrue(len(notification['notified']) > 20)
38
39 def test_get_empty_list_with_no_notifications(self):
40 PrometheusReceiver.notifications = []
41 self._get('/api/prometheus/notifications')
42 self.assertStatus(200)
43 self.assertJsonBody([])
44 self._get('/api/prometheus/notifications?from=last')
45 self.assertStatus(200)
46 self.assertJsonBody([])
47
48 def test_get_all_notification(self):
49 PrometheusReceiver.notifications = []
50 self._post('/api/prometheus_receiver', {'name': 'foo'})
51 self._post('/api/prometheus_receiver', {'name': 'bar'})
52 self._get('/api/prometheus/notifications')
53 self.assertStatus(200)
54 self.assertJsonBody(PrometheusReceiver.notifications)
55
56 def test_get_last_notification_with_use_of_last_keyword(self):
57 PrometheusReceiver.notifications = []
58 self._post('/api/prometheus_receiver', {'name': 'foo'})
59 self._post('/api/prometheus_receiver', {'name': 'bar'})
60 self._get('/api/prometheus/notifications?from=last')
61 self.assertStatus(200)
62 last = PrometheusReceiver.notifications[1]
63 self.assertJsonBody([last])
64
65 def test_get_no_notification_with_unknown_id(self):
66 PrometheusReceiver.notifications = []
67 self._post('/api/prometheus_receiver', {'name': 'foo'})
68 self._post('/api/prometheus_receiver', {'name': 'bar'})
69 self._get('/api/prometheus/notifications?from=42')
70 self.assertStatus(200)
71 self.assertJsonBody([])
72
73 def test_get_no_notification_since_with_last_notification(self):
74 PrometheusReceiver.notifications = []
75 self._post('/api/prometheus_receiver', {'name': 'foo'})
76 notification = PrometheusReceiver.notifications[0]
77 self._get('/api/prometheus/notifications?from=' + notification['id'])
78 self.assertStatus(200)
79 self.assertJsonBody([])
80
81 def test_get_notifications_since_last_notification(self):
82 PrometheusReceiver.notifications = []
83 self._post('/api/prometheus_receiver', {'name': 'foobar'})
84 next_to_last = PrometheusReceiver.notifications[0]
85 self._post('/api/prometheus_receiver', {'name': 'foo'})
86 self._post('/api/prometheus_receiver', {'name': 'bar'})
87 self._get('/api/prometheus/notifications?from=' + next_to_last['id'])
88 foreLast = PrometheusReceiver.notifications[1]
89 last = PrometheusReceiver.notifications[2]
90 self.assertEqual(self.jsonBody(), [foreLast, last])