]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # -*- coding: utf-8 -*- |
2 | from __future__ import absolute_import | |
3 | ||
4 | import re | |
5 | import json | |
6 | import cherrypy | |
9f95a23c TL |
7 | try: |
8 | import mock | |
9 | except ImportError: | |
10 | import unittest.mock as mock | |
11fdf7f2 TL |
11 | |
12 | from . import ControllerTestCase, KVStoreMockMixin | |
13 | from ..controllers import RESTController, Controller | |
14 | from ..tools import RequestLoggingTool | |
15 | from .. import mgr | |
16 | ||
17 | ||
18 | # pylint: disable=W0613 | |
19 | @Controller('/foo', secure=False) | |
20 | class FooResource(RESTController): | |
21 | def create(self, password): | |
22 | pass | |
23 | ||
24 | def get(self, key): | |
25 | pass | |
26 | ||
27 | def delete(self, key): | |
28 | pass | |
29 | ||
30 | def set(self, key, password, secret_key=None): | |
31 | pass | |
32 | ||
33 | ||
34 | class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin): | |
35 | ||
36 | def __init__(self, *args, **kwargs): | |
37 | cherrypy.tools.request_logging = RequestLoggingTool() | |
38 | cherrypy.config.update({'tools.request_logging.on': True}) | |
39 | super(ApiAuditingTest, self).__init__(*args, **kwargs) | |
40 | ||
41 | @classmethod | |
42 | def setup_server(cls): | |
43 | cls.setup_controllers([FooResource]) | |
44 | ||
45 | def setUp(self): | |
46 | self.mock_kv_store() | |
47 | mgr.cluster_log = mock.Mock() | |
48 | mgr.set_module_option('AUDIT_API_ENABLED', True) | |
49 | mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True) | |
50 | ||
51 | def _validate_cluster_log_msg(self, path, method, user, params): | |
52 | channel, _, msg = mgr.cluster_log.call_args_list[0][0] | |
53 | self.assertEqual(channel, 'audit') | |
54 | pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \ | |
55 | 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$' | |
56 | m = re.match(pattern, msg) | |
57 | self.assertEqual(m.group(2), path) | |
58 | self.assertEqual(m.group(3), method) | |
59 | self.assertEqual(m.group(4), user) | |
60 | self.assertDictEqual(json.loads(m.group(5)), params) | |
61 | ||
62 | def test_no_audit(self): | |
63 | mgr.set_module_option('AUDIT_API_ENABLED', False) | |
64 | self._delete('/foo/test1') | |
65 | mgr.cluster_log.assert_not_called() | |
66 | ||
67 | def test_no_payload(self): | |
68 | mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False) | |
69 | self._delete('/foo/test1') | |
70 | _, _, msg = mgr.cluster_log.call_args_list[0][0] | |
71 | self.assertNotIn('params=', msg) | |
72 | ||
73 | def test_no_audit_get(self): | |
74 | self._get('/foo/test1') | |
75 | mgr.cluster_log.assert_not_called() | |
76 | ||
77 | def test_audit_put(self): | |
78 | self._put('/foo/test1', {'password': 'y', 'secret_key': 1234}) | |
79 | mgr.cluster_log.assert_called_once() | |
80 | self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None', | |
81 | {'key': 'test1', | |
82 | 'password': '***', | |
83 | 'secret_key': '***'}) | |
84 | ||
85 | def test_audit_post(self): | |
86 | with mock.patch('dashboard.services.auth.JwtManager.get_username', | |
87 | return_value='hugo'): | |
88 | self._post('/foo?password=1234') | |
89 | mgr.cluster_log.assert_called_once() | |
90 | self._validate_cluster_log_msg('/foo', 'POST', 'hugo', | |
91 | {'password': '***'}) | |
92 | ||
93 | def test_audit_delete(self): | |
94 | self._delete('/foo/test1') | |
95 | mgr.cluster_log.assert_called_once() | |
96 | self._validate_cluster_log_msg('/foo/test1', 'DELETE', | |
97 | 'None', {'key': 'test1'}) |