]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_api_auditing.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_api_auditing.py
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3
4 import re
5 import json
6 import cherrypy
7 import mock
8
9 from . import ControllerTestCase, KVStoreMockMixin
10 from ..controllers import RESTController, Controller
11 from ..tools import RequestLoggingTool
12 from .. import mgr
13
14
15 # pylint: disable=W0613
16 @Controller('/foo', secure=False)
17 class FooResource(RESTController):
18 def create(self, password):
19 pass
20
21 def get(self, key):
22 pass
23
24 def delete(self, key):
25 pass
26
27 def set(self, key, password, secret_key=None):
28 pass
29
30
31 class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin):
32
33 def __init__(self, *args, **kwargs):
34 cherrypy.tools.request_logging = RequestLoggingTool()
35 cherrypy.config.update({'tools.request_logging.on': True})
36 super(ApiAuditingTest, self).__init__(*args, **kwargs)
37
38 @classmethod
39 def setup_server(cls):
40 cls.setup_controllers([FooResource])
41
42 def setUp(self):
43 self.mock_kv_store()
44 mgr.cluster_log = mock.Mock()
45 mgr.set_module_option('AUDIT_API_ENABLED', True)
46 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
47
48 def _validate_cluster_log_msg(self, path, method, user, params):
49 channel, _, msg = mgr.cluster_log.call_args_list[0][0]
50 self.assertEqual(channel, 'audit')
51 pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \
52 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$'
53 m = re.match(pattern, msg)
54 self.assertEqual(m.group(2), path)
55 self.assertEqual(m.group(3), method)
56 self.assertEqual(m.group(4), user)
57 self.assertDictEqual(json.loads(m.group(5)), params)
58
59 def test_no_audit(self):
60 mgr.set_module_option('AUDIT_API_ENABLED', False)
61 self._delete('/foo/test1')
62 mgr.cluster_log.assert_not_called()
63
64 def test_no_payload(self):
65 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False)
66 self._delete('/foo/test1')
67 _, _, msg = mgr.cluster_log.call_args_list[0][0]
68 self.assertNotIn('params=', msg)
69
70 def test_no_audit_get(self):
71 self._get('/foo/test1')
72 mgr.cluster_log.assert_not_called()
73
74 def test_audit_put(self):
75 self._put('/foo/test1', {'password': 'y', 'secret_key': 1234})
76 mgr.cluster_log.assert_called_once()
77 self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None',
78 {'key': 'test1',
79 'password': '***',
80 'secret_key': '***'})
81
82 def test_audit_post(self):
83 with mock.patch('dashboard.services.auth.JwtManager.get_username',
84 return_value='hugo'):
85 self._post('/foo?password=1234')
86 mgr.cluster_log.assert_called_once()
87 self._validate_cluster_log_msg('/foo', 'POST', 'hugo',
88 {'password': '***'})
89
90 def test_audit_delete(self):
91 self._delete('/foo/test1')
92 mgr.cluster_log.assert_called_once()
93 self._validate_cluster_log_msg('/foo/test1', 'DELETE',
94 'None', {'key': 'test1'})