]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/__init__.py
421378a155d80633dbb44e8cb3fc2322fa904203
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / __init__.py
1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-arguments
3 from __future__ import absolute_import
4
5 import json
6 import logging
7 import threading
8 import sys
9 import time
10
11 import cherrypy
12 from cherrypy._cptools import HandlerWrapperTool
13 from cherrypy.test import helper
14 from pyfakefs import fake_filesystem
15
16 from mgr_module import CLICommand
17
18 from .. import mgr
19 from ..controllers import json_error_page, generate_controller_routes
20 from ..services.auth import AuthManagerTool
21 from ..services.exception import dashboard_exception_handler
22
23 from ..plugins import PLUGIN_MANAGER
24 from ..plugins import feature_toggles, debug # noqa # pylint: disable=unused-import
25
26
27 PLUGIN_MANAGER.hook.init()
28 PLUGIN_MANAGER.hook.register_commands()
29
30
31 logger = logging.getLogger('tests')
32
33
34 class CmdException(Exception):
35 def __init__(self, retcode, message):
36 super(CmdException, self).__init__(message)
37 self.retcode = retcode
38
39
40 def exec_dashboard_cmd(command_handler, cmd, **kwargs):
41 cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
42 cmd_dict.update(kwargs)
43 if cmd_dict['prefix'] not in CLICommand.COMMANDS:
44 ret, out, err = command_handler(cmd_dict)
45 if ret < 0:
46 raise CmdException(ret, err)
47 try:
48 return json.loads(out)
49 except ValueError:
50 return out
51
52 ret, out, err = CLICommand.COMMANDS[cmd_dict['prefix']].call(mgr, cmd_dict,
53 None)
54 if ret < 0:
55 raise CmdException(ret, err)
56 try:
57 return json.loads(out)
58 except ValueError:
59 return out
60
61
62 class KVStoreMockMixin(object):
63 CONFIG_KEY_DICT = {}
64
65 @classmethod
66 def mock_set_module_option(cls, attr, val):
67 cls.CONFIG_KEY_DICT[attr] = val
68
69 @classmethod
70 def mock_get_module_option(cls, attr, default=None):
71 return cls.CONFIG_KEY_DICT.get(attr, default)
72
73 @classmethod
74 def mock_kv_store(cls):
75 cls.CONFIG_KEY_DICT.clear()
76 mgr.set_module_option.side_effect = cls.mock_set_module_option
77 mgr.get_module_option.side_effect = cls.mock_get_module_option
78 # kludge below
79 mgr.set_store.side_effect = cls.mock_set_module_option
80 mgr.get_store.side_effect = cls.mock_get_module_option
81
82 @classmethod
83 def get_key(cls, key):
84 return cls.CONFIG_KEY_DICT.get(key, None)
85
86
87 class CLICommandTestMixin(KVStoreMockMixin):
88 @classmethod
89 def exec_cmd(cls, cmd, **kwargs):
90 return exec_dashboard_cmd(None, cmd, **kwargs)
91
92
93 class FakeFsMixin(object):
94 fs = fake_filesystem.FakeFilesystem()
95 f_open = fake_filesystem.FakeFileOpen(fs)
96 f_os = fake_filesystem.FakeOsModule(fs)
97
98 if sys.version_info > (3, 0):
99 builtins_open = 'builtins.open'
100 else:
101 builtins_open = '__builtin__.open'
102
103
104 class ControllerTestCase(helper.CPWebCase):
105 _endpoints_cache = {}
106
107 @classmethod
108 def setup_controllers(cls, ctrl_classes, base_url=''):
109 if not isinstance(ctrl_classes, list):
110 ctrl_classes = [ctrl_classes]
111 mapper = cherrypy.dispatch.RoutesDispatcher()
112 endpoint_list = []
113 for ctrl in ctrl_classes:
114 inst = ctrl()
115
116 # We need to cache the controller endpoints because
117 # BaseController#endpoints method is not idempontent
118 # and a controller might be needed by more than one
119 # unit test.
120 if ctrl not in cls._endpoints_cache:
121 ctrl_endpoints = ctrl.endpoints()
122 cls._endpoints_cache[ctrl] = ctrl_endpoints
123
124 ctrl_endpoints = cls._endpoints_cache[ctrl]
125 for endpoint in ctrl_endpoints:
126 endpoint.inst = inst
127 endpoint_list.append(endpoint)
128 endpoint_list = sorted(endpoint_list, key=lambda e: e.url)
129 for endpoint in endpoint_list:
130 generate_controller_routes(endpoint, mapper, base_url)
131 if base_url == '':
132 base_url = '/'
133 cherrypy.tree.mount(None, config={
134 base_url: {'request.dispatch': mapper}})
135
136 def __init__(self, *args, **kwargs):
137 cherrypy.tools.authenticate = AuthManagerTool()
138 cherrypy.tools.dashboard_exception_handler = HandlerWrapperTool(dashboard_exception_handler,
139 priority=31)
140 cherrypy.config.update({
141 'error_page.default': json_error_page,
142 'tools.json_in.on': True,
143 'tools.json_in.force': False
144 })
145 PLUGIN_MANAGER.hook.configure_cherrypy(config=cherrypy.config)
146 super(ControllerTestCase, self).__init__(*args, **kwargs)
147
148 def _request(self, url, method, data=None, headers=None):
149 if not data:
150 b = None
151 h = None
152 else:
153 b = json.dumps(data)
154 h = [('Content-Type', 'application/json'),
155 ('Content-Length', str(len(b)))]
156 if headers:
157 h = headers
158 self.getPage(url, method=method, body=b, headers=h)
159
160 def _get(self, url, headers=None):
161 self._request(url, 'GET', headers=headers)
162
163 def _post(self, url, data=None):
164 self._request(url, 'POST', data)
165
166 def _delete(self, url, data=None):
167 self._request(url, 'DELETE', data)
168
169 def _put(self, url, data=None):
170 self._request(url, 'PUT', data)
171
172 def _task_request(self, method, url, data, timeout):
173 self._request(url, method, data)
174 if self.status != '202 Accepted':
175 logger.info("task finished immediately")
176 return
177
178 res = self.json_body()
179 self.assertIsInstance(res, dict)
180 self.assertIn('name', res)
181 self.assertIn('metadata', res)
182
183 task_name = res['name']
184 task_metadata = res['metadata']
185
186 # pylint: disable=protected-access
187 class Waiter(threading.Thread):
188 def __init__(self, task_name, task_metadata, tc):
189 super(Waiter, self).__init__()
190 self.task_name = task_name
191 self.task_metadata = task_metadata
192 self.ev = threading.Event()
193 self.abort = False
194 self.res_task = None
195 self.tc = tc
196
197 def run(self):
198 running = True
199 while running and not self.abort:
200 logger.info("task (%s, %s) is still executing", self.task_name,
201 self.task_metadata)
202 time.sleep(1)
203 self.tc._get('/api/task?name={}'.format(self.task_name))
204 res = self.tc.json_body()
205 for task in res['finished_tasks']:
206 if task['metadata'] == self.task_metadata:
207 # task finished
208 running = False
209 self.res_task = task
210 self.ev.set()
211
212 thread = Waiter(task_name, task_metadata, self)
213 thread.start()
214 status = thread.ev.wait(timeout)
215 if not status:
216 # timeout expired
217 thread.abort = True
218 thread.join()
219 raise Exception("Waiting for task ({}, {}) to finish timed out"
220 .format(task_name, task_metadata))
221 logger.info("task (%s, %s) finished", task_name, task_metadata)
222 if thread.res_task['success']:
223 self.body = json.dumps(thread.res_task['ret_value'])
224 if method == 'POST':
225 self.status = '201 Created'
226 elif method == 'PUT':
227 self.status = '200 OK'
228 elif method == 'DELETE':
229 self.status = '204 No Content'
230 return
231
232 if 'status' in thread.res_task['exception']:
233 self.status = thread.res_task['exception']['status']
234 else:
235 self.status = 500
236 self.body = json.dumps(thread.res_task['exception'])
237
238 def _task_post(self, url, data=None, timeout=60):
239 self._task_request('POST', url, data, timeout)
240
241 def _task_delete(self, url, timeout=60):
242 self._task_request('DELETE', url, None, timeout)
243
244 def _task_put(self, url, data=None, timeout=60):
245 self._task_request('PUT', url, data, timeout)
246
247 def json_body(self):
248 body_str = self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body
249 return json.loads(body_str)
250
251 def assertJsonBody(self, data, msg=None): # noqa: N802
252 """Fail if value != self.body."""
253 json_body = self.json_body()
254 if data != json_body:
255 if msg is None:
256 msg = 'expected body:\n%r\n\nactual body:\n%r' % (
257 data, json_body)
258 self._handlewebError(msg)
259
260 def assertInJsonBody(self, data, msg=None): # noqa: N802
261 json_body = self.json_body()
262 if data not in json_body:
263 if msg is None:
264 msg = 'expected %r to be in %r' % (data, json_body)
265 self._handlewebError(msg)