]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_api_auditing.py
1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
9 from . import ControllerTestCase
, KVStoreMockMixin
10 from ..controllers
import RESTController
, Controller
11 from ..tools
import RequestLoggingTool
15 # pylint: disable=W0613
16 @Controller('/foo', secure
=False)
17 class FooResource(RESTController
):
18 def create(self
, password
):
24 def delete(self
, key
):
27 def set(self
, key
, password
, secret_key
=None):
31 class ApiAuditingTest(ControllerTestCase
, KVStoreMockMixin
):
33 def __init__(self
, *args
, **kwargs
):
34 cherrypy
.tools
.request_logging
= RequestLoggingTool()
35 cherrypy
.config
.update({'tools.request_logging.on': True})
36 super(ApiAuditingTest
, self
).__init
__(*args
, **kwargs
)
39 def setup_server(cls
):
40 cls
.setup_controllers([FooResource
])
44 mgr
.cluster_log
= mock
.Mock()
45 mgr
.set_module_option('AUDIT_API_ENABLED', True)
46 mgr
.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
48 def _validate_cluster_log_msg(self
, path
, method
, user
, params
):
49 channel
, _
, msg
= mgr
.cluster_log
.call_args_list
[0][0]
50 self
.assertEqual(channel
, 'audit')
51 pattern
= r
'^\[DASHBOARD\] from=\'(.+)\' path
=\'(.+)\' ' \
52 'method
=\'(.+)\' user
=\'(.+)\' params
=\'(.+)\'$
'
53 m = re.match(pattern, msg)
54 self.assertEqual(m.group(2), path)
55 self.assertEqual(m.group(3), method)
56 self.assertEqual(m.group(4), user)
57 self.assertDictEqual(json.loads(m.group(5)), params)
59 def test_no_audit(self):
60 mgr.set_module_option('AUDIT_API_ENABLED
', False)
61 self._delete('/foo
/test1
')
62 mgr.cluster_log.assert_not_called()
64 def test_no_payload(self):
65 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD
', False)
66 self._delete('/foo
/test1
')
67 _, _, msg = mgr.cluster_log.call_args_list[0][0]
68 self.assertNotIn('params
=', msg)
70 def test_no_audit_get(self):
71 self._get('/foo
/test1
')
72 mgr.cluster_log.assert_not_called()
74 def test_audit_put(self):
75 self._put('/foo
/test1
', {'password
': 'y
', 'secret_key
': 1234})
76 mgr.cluster_log.assert_called_once()
77 self._validate_cluster_log_msg('/foo
/test1
', 'PUT
', 'None',
82 def test_audit_post(self):
83 with mock.patch('dashboard
.services
.auth
.JwtManager
.get_username
',
85 self._post('/foo?password
=1234')
86 mgr.cluster_log.assert_called_once()
87 self._validate_cluster_log_msg('/foo
', 'POST
', 'hugo
',
90 def test_audit_delete(self):
91 self._delete('/foo
/test1
')
92 mgr.cluster_log.assert_called_once()
93 self._validate_cluster_log_msg('/foo
/test1
', 'DELETE
',
94 'None', {'key
': 'test1
'})