]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_api_auditing.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_api_auditing.py
1 # -*- coding: utf-8 -*-
2
3 import json
4 import re
5
6 try:
7 import mock
8 except ImportError:
9 import unittest.mock as mock
10
11 from .. import mgr
12 from ..controllers import RESTController, Router
13 from ..tests import ControllerTestCase, KVStoreMockMixin
14
15
16 # pylint: disable=W0613
17 @Router('/foo', secure=False)
18 class FooResource(RESTController):
19 def create(self, password):
20 pass
21
22 def get(self, key):
23 pass
24
25 def delete(self, key):
26 pass
27
28 def set(self, key, password, secret_key=None):
29 pass
30
31
32 class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin):
33
34 _request_logging = True
35
36 @classmethod
37 def setup_server(cls):
38 cls.setup_controllers([FooResource])
39
40 def setUp(self):
41 self.mock_kv_store()
42 mgr.cluster_log = mock.Mock()
43 mgr.set_module_option('AUDIT_API_ENABLED', True)
44 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
45
46 def _validate_cluster_log_msg(self, path, method, user, params):
47 channel, _, msg = mgr.cluster_log.call_args_list[0][0]
48 self.assertEqual(channel, 'audit')
49 pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \
50 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$'
51 m = re.match(pattern, msg)
52 self.assertEqual(m.group(2), path)
53 self.assertEqual(m.group(3), method)
54 self.assertEqual(m.group(4), user)
55 self.assertDictEqual(json.loads(m.group(5)), params)
56
57 def test_no_audit(self):
58 mgr.set_module_option('AUDIT_API_ENABLED', False)
59 self._delete('/foo/test1')
60 mgr.cluster_log.assert_not_called()
61
62 def test_no_payload(self):
63 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False)
64 self._delete('/foo/test1')
65 _, _, msg = mgr.cluster_log.call_args_list[0][0]
66 self.assertNotIn('params=', msg)
67
68 def test_no_audit_get(self):
69 self._get('/foo/test1')
70 mgr.cluster_log.assert_not_called()
71
72 def test_audit_put(self):
73 self._put('/foo/test1', {'password': 'y', 'secret_key': 1234})
74 mgr.cluster_log.assert_called_once()
75 self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None',
76 {'key': 'test1',
77 'password': '***',
78 'secret_key': '***'})
79
80 def test_audit_post(self):
81 with mock.patch('dashboard.services.auth.JwtManager.get_username',
82 return_value='hugo'):
83 self._post('/foo?password=1234')
84 mgr.cluster_log.assert_called_once()
85 self._validate_cluster_log_msg('/foo', 'POST', 'hugo',
86 {'password': '***'})
87
88 def test_audit_delete(self):
89 self._delete('/foo/test1')
90 mgr.cluster_log.assert_called_once()
91 self._validate_cluster_log_msg('/foo/test1', 'DELETE',
92 'None', {'key': 'test1'})