1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-arguments
3 from __future__
import absolute_import
12 from cherrypy
._cptools
import HandlerWrapperTool
13 from cherrypy
.test
import helper
14 from pyfakefs
import fake_filesystem
16 from mgr_module
import CLICommand
19 from ..controllers
import json_error_page
, generate_controller_routes
20 from ..services
.auth
import AuthManagerTool
21 from ..services
.exception
import dashboard_exception_handler
23 from ..plugins
import PLUGIN_MANAGER
24 from ..plugins
import feature_toggles
, debug
# noqa # pylint: disable=unused-import
27 PLUGIN_MANAGER
.hook
.init()
28 PLUGIN_MANAGER
.hook
.register_commands()
31 logger
= logging
.getLogger('tests')
34 class CmdException(Exception):
35 def __init__(self
, retcode
, message
):
36 super(CmdException
, self
).__init
__(message
)
37 self
.retcode
= retcode
40 def exec_dashboard_cmd(command_handler
, cmd
, **kwargs
):
41 cmd_dict
= {'prefix': 'dashboard {}'.format(cmd
)}
42 cmd_dict
.update(kwargs
)
43 if cmd_dict
['prefix'] not in CLICommand
.COMMANDS
:
44 ret
, out
, err
= command_handler(cmd_dict
)
46 raise CmdException(ret
, err
)
48 return json
.loads(out
)
52 ret
, out
, err
= CLICommand
.COMMANDS
[cmd_dict
['prefix']].call(mgr
, cmd_dict
,
55 raise CmdException(ret
, err
)
57 return json
.loads(out
)
62 class KVStoreMockMixin(object):
66 def mock_set_module_option(cls
, attr
, val
):
67 cls
.CONFIG_KEY_DICT
[attr
] = val
70 def mock_get_module_option(cls
, attr
, default
=None):
71 return cls
.CONFIG_KEY_DICT
.get(attr
, default
)
74 def mock_kv_store(cls
):
75 cls
.CONFIG_KEY_DICT
.clear()
76 mgr
.set_module_option
.side_effect
= cls
.mock_set_module_option
77 mgr
.get_module_option
.side_effect
= cls
.mock_get_module_option
79 mgr
.set_store
.side_effect
= cls
.mock_set_module_option
80 mgr
.get_store
.side_effect
= cls
.mock_get_module_option
83 def get_key(cls
, key
):
84 return cls
.CONFIG_KEY_DICT
.get(key
, None)
87 class CLICommandTestMixin(KVStoreMockMixin
):
89 def exec_cmd(cls
, cmd
, **kwargs
):
90 return exec_dashboard_cmd(None, cmd
, **kwargs
)
93 class FakeFsMixin(object):
94 fs
= fake_filesystem
.FakeFilesystem()
95 f_open
= fake_filesystem
.FakeFileOpen(fs
)
96 f_os
= fake_filesystem
.FakeOsModule(fs
)
98 if sys
.version_info
> (3, 0):
99 builtins_open
= 'builtins.open'
101 builtins_open
= '__builtin__.open'
104 class ControllerTestCase(helper
.CPWebCase
):
105 _endpoints_cache
= {}
108 def setup_controllers(cls
, ctrl_classes
, base_url
=''):
109 if not isinstance(ctrl_classes
, list):
110 ctrl_classes
= [ctrl_classes
]
111 mapper
= cherrypy
.dispatch
.RoutesDispatcher()
113 for ctrl
in ctrl_classes
:
116 # We need to cache the controller endpoints because
117 # BaseController#endpoints method is not idempontent
118 # and a controller might be needed by more than one
120 if ctrl
not in cls
._endpoints
_cache
:
121 ctrl_endpoints
= ctrl
.endpoints()
122 cls
._endpoints
_cache
[ctrl
] = ctrl_endpoints
124 ctrl_endpoints
= cls
._endpoints
_cache
[ctrl
]
125 for endpoint
in ctrl_endpoints
:
127 endpoint_list
.append(endpoint
)
128 endpoint_list
= sorted(endpoint_list
, key
=lambda e
: e
.url
)
129 for endpoint
in endpoint_list
:
130 generate_controller_routes(endpoint
, mapper
, base_url
)
133 cherrypy
.tree
.mount(None, config
={
134 base_url
: {'request.dispatch': mapper
}})
136 def __init__(self
, *args
, **kwargs
):
137 cherrypy
.tools
.authenticate
= AuthManagerTool()
138 cherrypy
.tools
.dashboard_exception_handler
= HandlerWrapperTool(dashboard_exception_handler
,
140 cherrypy
.config
.update({
141 'error_page.default': json_error_page
,
142 'tools.json_in.on': True,
143 'tools.json_in.force': False
145 PLUGIN_MANAGER
.hook
.configure_cherrypy(config
=cherrypy
.config
)
146 super(ControllerTestCase
, self
).__init
__(*args
, **kwargs
)
148 def _request(self
, url
, method
, data
=None, headers
=None):
154 h
= [('Content-Type', 'application/json'),
155 ('Content-Length', str(len(b
)))]
158 self
.getPage(url
, method
=method
, body
=b
, headers
=h
)
160 def _get(self
, url
, headers
=None):
161 self
._request
(url
, 'GET', headers
=headers
)
163 def _post(self
, url
, data
=None):
164 self
._request
(url
, 'POST', data
)
166 def _delete(self
, url
, data
=None):
167 self
._request
(url
, 'DELETE', data
)
169 def _put(self
, url
, data
=None):
170 self
._request
(url
, 'PUT', data
)
172 def _task_request(self
, method
, url
, data
, timeout
):
173 self
._request
(url
, method
, data
)
174 if self
.status
!= '202 Accepted':
175 logger
.info("task finished immediately")
178 res
= self
.json_body()
179 self
.assertIsInstance(res
, dict)
180 self
.assertIn('name', res
)
181 self
.assertIn('metadata', res
)
183 task_name
= res
['name']
184 task_metadata
= res
['metadata']
186 # pylint: disable=protected-access
187 class Waiter(threading
.Thread
):
188 def __init__(self
, task_name
, task_metadata
, tc
):
189 super(Waiter
, self
).__init
__()
190 self
.task_name
= task_name
191 self
.task_metadata
= task_metadata
192 self
.ev
= threading
.Event()
199 while running
and not self
.abort
:
200 logger
.info("task (%s, %s) is still executing", self
.task_name
,
203 self
.tc
._get
('/api/task?name={}'.format(self
.task_name
))
204 res
= self
.tc
.json_body()
205 for task
in res
['finished_tasks']:
206 if task
['metadata'] == self
.task_metadata
:
212 thread
= Waiter(task_name
, task_metadata
, self
)
214 status
= thread
.ev
.wait(timeout
)
219 raise Exception("Waiting for task ({}, {}) to finish timed out"
220 .format(task_name
, task_metadata
))
221 logger
.info("task (%s, %s) finished", task_name
, task_metadata
)
222 if thread
.res_task
['success']:
223 self
.body
= json
.dumps(thread
.res_task
['ret_value'])
225 self
.status
= '201 Created'
226 elif method
== 'PUT':
227 self
.status
= '200 OK'
228 elif method
== 'DELETE':
229 self
.status
= '204 No Content'
232 if 'status' in thread
.res_task
['exception']:
233 self
.status
= thread
.res_task
['exception']['status']
236 self
.body
= json
.dumps(thread
.res_task
['exception'])
238 def _task_post(self
, url
, data
=None, timeout
=60):
239 self
._task
_request
('POST', url
, data
, timeout
)
241 def _task_delete(self
, url
, timeout
=60):
242 self
._task
_request
('DELETE', url
, None, timeout
)
244 def _task_put(self
, url
, data
=None, timeout
=60):
245 self
._task
_request
('PUT', url
, data
, timeout
)
248 body_str
= self
.body
.decode('utf-8') if isinstance(self
.body
, bytes
) else self
.body
249 return json
.loads(body_str
)
251 def assertJsonBody(self
, data
, msg
=None): # noqa: N802
252 """Fail if value != self.body."""
253 json_body
= self
.json_body()
254 if data
!= json_body
:
256 msg
= 'expected body:\n%r\n\nactual body:\n%r' % (
258 self
._handlewebError
(msg
)
260 def assertInJsonBody(self
, data
, msg
=None): # noqa: N802
261 json_body
= self
.json_body()
262 if data
not in json_body
:
264 msg
= 'expected %r to be in %r' % (data
, json_body
)
265 self
._handlewebError
(msg
)