]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/dashboard/tests/test_api_auditing.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_api_auditing.py
CommitLineData
11fdf7f2
TL
1# -*- coding: utf-8 -*-
2from __future__ import absolute_import
3
4import re
5import json
6import cherrypy
9f95a23c
TL
7try:
8 import mock
9except ImportError:
10 import unittest.mock as mock
11fdf7f2
TL
11
12from . import ControllerTestCase, KVStoreMockMixin
13from ..controllers import RESTController, Controller
14from ..tools import RequestLoggingTool
15from .. import mgr
16
17
18# pylint: disable=W0613
19@Controller('/foo', secure=False)
20class FooResource(RESTController):
21 def create(self, password):
22 pass
23
24 def get(self, key):
25 pass
26
27 def delete(self, key):
28 pass
29
30 def set(self, key, password, secret_key=None):
31 pass
32
33
34class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin):
35
36 def __init__(self, *args, **kwargs):
37 cherrypy.tools.request_logging = RequestLoggingTool()
38 cherrypy.config.update({'tools.request_logging.on': True})
39 super(ApiAuditingTest, self).__init__(*args, **kwargs)
40
41 @classmethod
42 def setup_server(cls):
43 cls.setup_controllers([FooResource])
44
45 def setUp(self):
46 self.mock_kv_store()
47 mgr.cluster_log = mock.Mock()
48 mgr.set_module_option('AUDIT_API_ENABLED', True)
49 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
50
51 def _validate_cluster_log_msg(self, path, method, user, params):
52 channel, _, msg = mgr.cluster_log.call_args_list[0][0]
53 self.assertEqual(channel, 'audit')
54 pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \
55 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$'
56 m = re.match(pattern, msg)
57 self.assertEqual(m.group(2), path)
58 self.assertEqual(m.group(3), method)
59 self.assertEqual(m.group(4), user)
60 self.assertDictEqual(json.loads(m.group(5)), params)
61
62 def test_no_audit(self):
63 mgr.set_module_option('AUDIT_API_ENABLED', False)
64 self._delete('/foo/test1')
65 mgr.cluster_log.assert_not_called()
66
67 def test_no_payload(self):
68 mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False)
69 self._delete('/foo/test1')
70 _, _, msg = mgr.cluster_log.call_args_list[0][0]
71 self.assertNotIn('params=', msg)
72
73 def test_no_audit_get(self):
74 self._get('/foo/test1')
75 mgr.cluster_log.assert_not_called()
76
77 def test_audit_put(self):
78 self._put('/foo/test1', {'password': 'y', 'secret_key': 1234})
79 mgr.cluster_log.assert_called_once()
80 self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None',
81 {'key': 'test1',
82 'password': '***',
83 'secret_key': '***'})
84
85 def test_audit_post(self):
86 with mock.patch('dashboard.services.auth.JwtManager.get_username',
87 return_value='hugo'):
88 self._post('/foo?password=1234')
89 mgr.cluster_log.assert_called_once()
90 self._validate_cluster_log_msg('/foo', 'POST', 'hugo',
91 {'password': '***'})
92
93 def test_audit_delete(self):
94 self._delete('/foo/test1')
95 mgr.cluster_log.assert_called_once()
96 self._validate_cluster_log_msg('/foo/test1', 'DELETE',
97 'None', {'key': 'test1'})