]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_api_auditing.py
1 # -*- coding: utf-8 -*-
9 import unittest
.mock
as mock
12 from ..controllers
import RESTController
, Router
13 from ..tests
import ControllerTestCase
, KVStoreMockMixin
16 # pylint: disable=W0613
17 @Router('/foo', secure
=False)
18 class FooResource(RESTController
):
19 def create(self
, password
):
25 def delete(self
, key
):
28 def set(self
, key
, password
, secret_key
=None):
32 class ApiAuditingTest(ControllerTestCase
, KVStoreMockMixin
):
34 _request_logging
= True
37 def setup_server(cls
):
38 cls
.setup_controllers([FooResource
])
42 mgr
.cluster_log
= mock
.Mock()
43 mgr
.set_module_option('AUDIT_API_ENABLED', True)
44 mgr
.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
46 def _validate_cluster_log_msg(self
, path
, method
, user
, params
):
47 channel
, _
, msg
= mgr
.cluster_log
.call_args_list
[0][0]
48 self
.assertEqual(channel
, 'audit')
49 pattern
= r
'^\[DASHBOARD\] from=\'(.+)\' path
=\'(.+)\' ' \
50 'method
=\'(.+)\' user
=\'(.+)\' params
=\'(.+)\'$
'
51 m = re.match(pattern, msg)
52 self.assertEqual(m.group(2), path)
53 self.assertEqual(m.group(3), method)
54 self.assertEqual(m.group(4), user)
55 self.assertDictEqual(json.loads(m.group(5)), params)
57 def test_no_audit(self):
58 mgr.set_module_option('AUDIT_API_ENABLED
', False)
59 self._delete('/foo
/test1
')
60 mgr.cluster_log.assert_not_called()
62 def test_no_payload(self):
63 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD
', False)
64 self._delete('/foo
/test1
')
65 _, _, msg = mgr.cluster_log.call_args_list[0][0]
66 self.assertNotIn('params
=', msg)
68 def test_no_audit_get(self):
69 self._get('/foo
/test1
')
70 mgr.cluster_log.assert_not_called()
72 def test_audit_put(self):
73 self._put('/foo
/test1
', {'password
': 'y
', 'secret_key
': 1234})
74 mgr.cluster_log.assert_called_once()
75 self._validate_cluster_log_msg('/foo
/test1
', 'PUT
', 'None',
80 def test_audit_post(self):
81 with mock.patch('dashboard
.services
.auth
.JwtManager
.get_username
',
83 self._post('/foo?password
=1234')
84 mgr.cluster_log.assert_called_once()
85 self._validate_cluster_log_msg('/foo
', 'POST
', 'hugo
',
88 def test_audit_delete(self):
89 self._delete('/foo
/test1
')
90 mgr.cluster_log.assert_called_once()
91 self._validate_cluster_log_msg('/foo
/test1
', 'DELETE
',
92 'None', {'key
': 'test1
'})